From 880d77a8746dabec4b65a86dd6f7eeacf2ee5cf2 Mon Sep 17 00:00:00 2001 From: kares Date: Tue, 29 Mar 2022 11:32:30 +0200 Subject: [PATCH 01/29] Refactor: review/normalize (exception) logging --- lib/logstash/outputs/tcp.rb | 39 +++++++++++++++++++++++++------------ 1 file changed, 27 insertions(+), 12 deletions(-) diff --git a/lib/logstash/outputs/tcp.rb b/lib/logstash/outputs/tcp.rb index 3b06465..ff6e56a 100644 --- a/lib/logstash/outputs/tcp.rb +++ b/lib/logstash/outputs/tcp.rb @@ -65,8 +65,7 @@ def run begin @socket.write(@queue.pop) rescue => e - @logger.warn("tcp output exception", :socket => @socket, - :exception => e) + log_warn 'socket write failed:', e, socket: (@socket ? @socket.to_s : nil) break end end @@ -119,8 +118,7 @@ def register begin @server_socket = TCPServer.new(@host, @port) rescue Errno::EADDRINUSE - @logger.error("Could not start TCP server: Address in use", - :host => @host, :port => @port) + @logger.error("Could not start tcp server: Address in use", host: @host, port: @port) raise end if @ssl_enable @@ -133,8 +131,7 @@ def register Thread.start(server_socket.accept) do |client_socket| # monkeypatch a 'peer' method onto the socket. client_socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end } - @logger.debug("Accepted connection", :client => client_socket.peer, - :server => "#{@host}:#{@port}") + @logger.debug("accepted connection", client: client_socket.peer, server: "#{@host}:#{@port}") client = Client.new(client_socket, @logger) Thread.current[:client] = client @client_threads << Thread.current @@ -163,8 +160,7 @@ def register # Now send the payload client_socket.syswrite(payload) if w.any? rescue => e - @logger.warn("tcp output exception", :host => @host, :port => @port, - :exception => e, :backtrace => e.backtrace) + log_warn "client socket failed:", e, host: @host, port: @port, socket: (client_socket ? client_socket.to_s : nil) client_socket.close rescue nil client_socket = nil sleep @reconnect_interval @@ -183,17 +179,17 @@ def connect begin client_socket.connect rescue OpenSSL::SSL::SSLError => ssle - @logger.error("SSL Error", :exception => ssle, :backtrace => ssle.backtrace) + log_error 'connect ssl failure:', ssle, backtrace: true # NOTE(mrichar1): Hack to prevent hammering peer sleep(5) raise end end client_socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end } - @logger.debug("Opened connection", :client => "#{client_socket.peer}") + @logger.debug("opened connection", :client => client_socket.peer) return client_socket - rescue StandardError => e - @logger.error("Failed to connect: #{e.message}", :exception => e.class, :backtrace => e.backtrace) + rescue => e + log_error 'failed to connect:', e sleep @reconnect_interval retry end @@ -208,4 +204,23 @@ def server? def receive(event) @codec.encode(event) end # def receive + + private + + def log_warn(msg, e, details = {}) + details = details.merge message: e.message, exception: e.class + if details[:backtrace] || (details[:backtrace].nil? && @logger.debug?) + details[:backtrace] = e.backtrace + end + @logger.warn(msg, details) + end + + def log_error(msg, e, backtrace: nil) + details = { message: e.message, exception: e.class } + if backtrace || (backtrace.nil? && @logger.debug?) + details[:backtrace] = e.backtrace + end + @logger.error(msg, details) + end + end # class LogStash::Outputs::Tcp From d315921363161b2a574030ac5306e359d2f696e4 Mon Sep 17 00:00:00 2001 From: kares Date: Tue, 29 Mar 2022 11:44:31 +0200 Subject: [PATCH 02/29] Refactor: code for improved readability --- lib/logstash/outputs/tcp.rb | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/lib/logstash/outputs/tcp.rb b/lib/logstash/outputs/tcp.rb index ff6e56a..76bfe90 100644 --- a/lib/logstash/outputs/tcp.rb +++ b/lib/logstash/outputs/tcp.rb @@ -52,14 +52,13 @@ class LogStash::Outputs::Tcp < LogStash::Outputs::Base config :ssl_key_passphrase, :validate => :password, :default => nil class Client - public + def initialize(socket, logger) @socket = socket @logger = logger @queue = Queue.new end - public def run loop do begin @@ -71,13 +70,11 @@ def run end end # def run - public def write(msg) @queue.push(msg) end # def write end # class Client - private def setup_ssl require "openssl" @@ -104,8 +101,9 @@ def setup_ssl @ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER|OpenSSL::SSL::VERIFY_FAIL_IF_NO_PEER_CERT end end # def setup_ssl + private :setup_ssl - public + # @overload Base#register def register require "socket" require "stud/try" @@ -168,9 +166,15 @@ def register end end end - end # def register + end + + # @overload Base#receive + def receive(event) + @codec.encode(event) + end private + def connect begin client_socket = TCPSocket.new(@host, @port) @@ -195,18 +199,10 @@ def connect end end # def connect - private def server? @mode == "server" end # def server? - public - def receive(event) - @codec.encode(event) - end # def receive - - private - def log_warn(msg, e, details = {}) details = details.merge message: e.message, exception: e.class if details[:backtrace] || (details[:backtrace].nil? && @logger.debug?) From ce5a90ca52e8cc6fc28bae742cae80edbfb7efd2 Mon Sep 17 00:00:00 2001 From: kares Date: Tue, 29 Mar 2022 14:49:54 +0200 Subject: [PATCH 03/29] Test: add some happy/error path tests --- spec/outputs/tcp_spec.rb | 58 +++++++++++++++++++++++++++++++++++++--- 1 file changed, 54 insertions(+), 4 deletions(-) diff --git a/spec/outputs/tcp_spec.rb b/spec/outputs/tcp_spec.rb index d5494b8..e8b42c7 100644 --- a/spec/outputs/tcp_spec.rb +++ b/spec/outputs/tcp_spec.rb @@ -4,10 +4,60 @@ describe LogStash::Outputs::Tcp do subject { described_class.new(config) } - let(:config) { { - "host" => "localhost", - "port" => 2000 + rand(3000), - } } + + let(:port) do + begin + # Start high to better avoid common services + port = rand(10000..65535) + s = TCPServer.new("127.0.0.1", port) + s.close + + port + rescue Errno::EADDRINUSE + retry + end + end + + let(:config) { { "host" => "localhost", "port" => port } } + + let(:event) { LogStash::Event.new('message' => 'foo bar') } + + context 'failing to connect' do + + before { subject.register } + + let(:config) { super().merge 'port' => 1000 } + + it 'fails to connect' do + expect( subject ).to receive(:log_error).and_call_original + Thread.start { subject.receive(event) } + sleep 1.0 + end + + end + + context 'server mode' do + + before { subject.register } + + let(:config) { super().merge 'mode' => 'server' } + + let(:client) do + Stud::try(3.times) { TCPSocket.new("127.0.0.1", port) } + end + + after { client.close } + + it 'receives serialized data' do + client # connect + Thread.start { sleep 0.5; subject.receive event } + + read = client.recv(1000) + expect( read.size ).to be > 0 + expect( JSON.parse(read)['message'] ).to eql 'foo bar' + end + + end context "when enabling SSL" do let(:config) { super().merge("ssl_enable" => true) } From 30ddbe7fde1edd42872635a35bcdf3f8da3ad005 Mon Sep 17 00:00:00 2001 From: kares Date: Tue, 29 Mar 2022 15:31:50 +0200 Subject: [PATCH 04/29] a close operation should release client sockets --- lib/logstash/outputs/tcp.rb | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/lib/logstash/outputs/tcp.rb b/lib/logstash/outputs/tcp.rb index 76bfe90..043fcaf 100644 --- a/lib/logstash/outputs/tcp.rb +++ b/lib/logstash/outputs/tcp.rb @@ -73,6 +73,14 @@ def run def write(msg) @queue.push(msg) end # def write + + def close + begin + @socket.close + rescue => e + log_warn 'socket close failed:', e, socket: (@socket ? @socket.to_s : nil) + end + end end # class Client def setup_ssl @@ -173,6 +181,16 @@ def receive(event) @codec.encode(event) end + # @overload Base#close + def close + return unless @client_threads + + @client_threads.each do |thread| + client = thread[:client] + client.close if client + end + end + private def connect From 87afddce46c6751c7494d425aabaeaa1e0f3d5f0 Mon Sep 17 00:00:00 2001 From: kares Date: Wed, 30 Mar 2022 09:15:34 +0200 Subject: [PATCH 05/29] Feat: ssl_supported_protocols option --- lib/logstash/outputs/tcp.rb | 17 ++++++++++- spec/outputs/tcp_spec.rb | 60 +++++++++++++++++++++++++++++++++++-- 2 files changed, 74 insertions(+), 3 deletions(-) diff --git a/lib/logstash/outputs/tcp.rb b/lib/logstash/outputs/tcp.rb index 043fcaf..0f9637e 100644 --- a/lib/logstash/outputs/tcp.rb +++ b/lib/logstash/outputs/tcp.rb @@ -51,6 +51,9 @@ class LogStash::Outputs::Tcp < LogStash::Outputs::Base # SSL key passphrase config :ssl_key_passphrase, :validate => :password, :default => nil + # NOTE: the default setting [] uses SSL engine defaults + config :ssl_supported_protocols, :validate => ['TLSv1.1', 'TLSv1.2', 'TLSv1.3'], :default => [], :list => true + class Client def initialize(socket, logger) @@ -86,7 +89,7 @@ def close def setup_ssl require "openssl" - @ssl_context = OpenSSL::SSL::SSLContext.new + @ssl_context = new_ssl_context if @ssl_cert @ssl_context.cert = OpenSSL::X509::Certificate.new(File.read(@ssl_cert)) if @ssl_key @@ -108,9 +111,21 @@ def setup_ssl @ssl_context.cert_store = @cert_store @ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER|OpenSSL::SSL::VERIFY_FAIL_IF_NO_PEER_CERT end + + if ssl_supported_protocols.any? + min_max_version = ssl_supported_protocols.sort.map { |n| n.sub('v', '').sub('.', '_').to_sym } # 'TLSv1.2' => :TLS1_2 + @ssl_context.min_version = min_max_version.first + @ssl_context.max_version = min_max_version.last + end end # def setup_ssl private :setup_ssl + # @note to be able to hook up into #ssl_context from tests + def new_ssl_context + OpenSSL::SSL::SSLContext.new + end + private :new_ssl_context + # @overload Base#register def register require "socket" diff --git a/spec/outputs/tcp_spec.rb b/spec/outputs/tcp_spec.rb index e8b42c7..68e7947 100644 --- a/spec/outputs/tcp_spec.rb +++ b/spec/outputs/tcp_spec.rb @@ -18,6 +18,8 @@ end end + let(:server) { TCPServer.new("127.0.0.1", port) } + let(:config) { { "host" => "localhost", "port" => port } } let(:event) { LogStash::Event.new('message' => 'foo bar') } @@ -46,7 +48,7 @@ Stud::try(3.times) { TCPSocket.new("127.0.0.1", port) } end - after { client.close } + after { subject.close } it 'receives serialized data' do client # connect @@ -60,7 +62,7 @@ end context "when enabling SSL" do - let(:config) { super().merge("ssl_enable" => true) } + let(:config) { super().merge("ssl_enable" => true, 'codec' => 'plain') } context "and not providing a certificate/key pair" do it "registers without error" do expect { subject.register }.to_not raise_error @@ -101,6 +103,60 @@ end end + + let(:secure_server) do + ssl_context = OpenSSL::SSL::SSLContext.new + ssl_context.verify_mode = OpenSSL::SSL::VERIFY_NONE + ssl_context.cert = OpenSSL::X509::Certificate.new(File.read(crt_file)) + ssl_context.key = OpenSSL::PKey::RSA.new(File.read(key_file), nil) + ssl_context.ssl_version = server_ssl_version if server_ssl_version + ssl_context.min_version = server_min_version if server_min_version + OpenSSL::SSL::SSLServer.new(server, ssl_context) + end + + let(:server_min_version) { nil } + let(:server_ssl_version) { nil } + + context 'with supported protocol' do + + let(:config) { super().merge("ssl_supported_protocols" => ['TLSv1.2']) } + + let(:server_min_version) { 'TLS1_2' } + + before { subject.register } + after { secure_server.close } + + it 'reads plain data' do + Thread.start { sleep 0.25; subject.receive event } + socket = secure_server.accept + read = socket.sysread(100) + expect( read.size ).to be > 0 + expect( read ).to end_with 'foo bar' + end + + end + + context 'with unsupported protocol (on server)' do + + let(:config) { super().merge("ssl_supported_protocols" => ['TLSv1.1']) } + + let(:server_min_version) { 'TLS1_2' } + + before { subject.register } + after { secure_server.close } + + it 'fails (and loops retrying)' do + expect(subject.logger).to receive(:error).with(/connect ssl failure/i, hash_including(message: /No appropriate protocol/i)).and_call_original + expect(subject.logger).to receive(:error).with(/failed to connect/i, hash_including(exception: OpenSSL::SSL::SSLError)).and_call_original + expect(subject).to receive(:sleep).once.and_call_original + expect(subject).to receive(:sleep).once.and_throw :TEST_DONE # to be able to abort the retry loop + + Thread.start { secure_server.accept rescue nil } + expect { subject.receive event }.to throw_symbol(:TEST_DONE) + end + + end + end context "encrypted key using PKCS#1" do From dd4072f28aac5a012c983657654a7ca36042a40d Mon Sep 17 00:00:00 2001 From: kares Date: Wed, 30 Mar 2022 09:16:03 +0200 Subject: [PATCH 06/29] Refactor: less noise on warnings --- lib/logstash/outputs/tcp.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/logstash/outputs/tcp.rb b/lib/logstash/outputs/tcp.rb index 0f9637e..33a43c6 100644 --- a/lib/logstash/outputs/tcp.rb +++ b/lib/logstash/outputs/tcp.rb @@ -216,7 +216,7 @@ def connect begin client_socket.connect rescue OpenSSL::SSL::SSLError => ssle - log_error 'connect ssl failure:', ssle, backtrace: true + log_error 'connect ssl failure:', ssle, backtrace: false # NOTE(mrichar1): Hack to prevent hammering peer sleep(5) raise @@ -246,7 +246,7 @@ def log_warn(msg, e, details = {}) def log_error(msg, e, backtrace: nil) details = { message: e.message, exception: e.class } - if backtrace || (backtrace.nil? && @logger.debug?) + if backtrace || (backtrace.nil? && @logger.info?) details[:backtrace] = e.backtrace end @logger.error(msg, details) From 4490a22b01118e8693e276fd18734d79dfc6c987 Mon Sep 17 00:00:00 2001 From: kares Date: Wed, 30 Mar 2022 09:44:29 +0200 Subject: [PATCH 07/29] Docs: copy-pasta for ssl_supported_protocols --- docs/index.asciidoc | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index 4df55a2..4db2112 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -45,6 +45,7 @@ This plugin supports the following configuration options plus the <> |<>|No | <> |a valid filesystem path|No | <> |<>|No +| <> |<>|No | <> |<>|No |======================================================================= @@ -130,6 +131,23 @@ SSL key path SSL key passphrase +[id="plugins-{type}s-{plugin}-ssl_supported_protocols"] +===== `ssl_supported_protocols` + + * Value type is <> + * Allowed values are: `'TLSv1.1'`, `'TLSv1.2'`, `'TLSv1.3'` + * Default depends on the JDK being used. With up-to-date Logstash, the default is `['TLSv1.2', 'TLSv1.3']`. + `'TLSv1.1'` is not considered secure and is only provided for legacy applications. + +List of allowed SSL/TLS versions to use when establishing a secure connection. + +For Java 8 `'TLSv1.3'` is supported only since **8u262** (AdoptOpenJDK), but requires that you set the +`LS_JAVA_OPTS="-Djdk.tls.client.protocols=TLSv1.3"` system property in Logstash. + +NOTE: If you configure the plugin to use `'TLSv1.1'` on any recent JVM, such as the one packaged with Logstash, +the protocol is disabled by default and needs to be enabled manually by changing `jdk.tls.disabledAlgorithms` in +the *$JDK_HOME/conf/security/java.security* configuration file. That is, `TLSv1.1` needs to be removed from the list. + [id="plugins-{type}s-{plugin}-ssl_verify"] ===== `ssl_verify` From 33b61ae43644bae0586460e84f7878fc36f3ba24 Mon Sep 17 00:00:00 2001 From: kares Date: Wed, 30 Mar 2022 09:45:07 +0200 Subject: [PATCH 08/29] jruby-openssl >= 0.12.2 dependency needed --- logstash-output-tcp.gemspec | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/logstash-output-tcp.gemspec b/logstash-output-tcp.gemspec index b65fe8d..a9572a0 100644 --- a/logstash-output-tcp.gemspec +++ b/logstash-output-tcp.gemspec @@ -25,7 +25,17 @@ Gem::Specification.new do |s| s.add_runtime_dependency 'logstash-codec-json' s.add_runtime_dependency 'stud' + # NOTE: be able to test against LS 7.x / 8.x + # (some LS versions have strict jruby-openssl requirements in logstash-core e.g. "= 0.11.0") + if ENV['ELASTIC_CONTAINER'].eql?('true') && ENV['PWD'].eql?('/usr/share/plugins/plugin') + # detected running under CI - ENV['CI'] is not enough e.g. during `docker-compose build` + else + # we depend on bouncycastle's bcpkix-jdk15on being on the class-path + s.add_runtime_dependency 'jruby-openssl', '>= 0.12.2' # 0.12 supports TLSv1.3 + end + s.add_development_dependency 'logstash-devutils' + s.add_development_dependency 'logstash-codec-plain' s.add_development_dependency 'flores' end From 7cae392ff4f469a13bdef7c1dfbc58d32894a1db Mon Sep 17 00:00:00 2001 From: kares Date: Wed, 30 Mar 2022 09:45:15 +0200 Subject: [PATCH 09/29] minor --- spec/outputs/tcp_spec.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/spec/outputs/tcp_spec.rb b/spec/outputs/tcp_spec.rb index 68e7947..f394717 100644 --- a/spec/outputs/tcp_spec.rb +++ b/spec/outputs/tcp_spec.rb @@ -111,10 +111,12 @@ ssl_context.key = OpenSSL::PKey::RSA.new(File.read(key_file), nil) ssl_context.ssl_version = server_ssl_version if server_ssl_version ssl_context.min_version = server_min_version if server_min_version + ssl_context.max_version = server_max_version if server_max_version OpenSSL::SSL::SSLServer.new(server, ssl_context) end let(:server_min_version) { nil } + let(:server_max_version) { nil } let(:server_ssl_version) { nil } context 'with supported protocol' do From 63e7813653b46709615d15dca9e61bef656d0a99 Mon Sep 17 00:00:00 2001 From: kares Date: Wed, 30 Mar 2022 09:54:35 +0200 Subject: [PATCH 10/29] Test: unit test for ssl_supported_protocols --- spec/outputs/tcp_spec.rb | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/spec/outputs/tcp_spec.rb b/spec/outputs/tcp_spec.rb index f394717..68e03bb 100644 --- a/spec/outputs/tcp_spec.rb +++ b/spec/outputs/tcp_spec.rb @@ -61,6 +61,40 @@ end + context "with forced protocol" do + let(:config) do + super().merge 'ssl_supported_protocols' => [ 'TLSv1.1' ] + end + + before do + ssl_context = OpenSSL::SSL::SSLContext.new + allow(subject).to receive(:new_ssl_context).and_return(ssl_context) + expect(ssl_context).to receive(:min_version=).with(:'TLS1_1').and_call_original + expect(ssl_context).to receive(:max_version=).with(:'TLS1_1').and_call_original + end + + it "sets min/max version" do + subject.send :setup_ssl + end + end + + context "with protocol range" do + let(:config) do + super().merge 'ssl_supported_protocols' => [ 'TLSv1.3', 'TLSv1.1', 'TLSv1.2' ] + end + + before do + ssl_context = OpenSSL::SSL::SSLContext.new + allow(subject).to receive(:new_ssl_context).and_return(ssl_context) + expect(ssl_context).to receive(:min_version=).with(:'TLS1_1').and_call_original + expect(ssl_context).to receive(:max_version=).with(:'TLS1_3').and_call_original + end + + it "sets min/max version" do + subject.send :setup_ssl + end + end + context "when enabling SSL" do let(:config) { super().merge("ssl_enable" => true, 'codec' => 'plain') } context "and not providing a certificate/key pair" do From 5909151a85565d4f53e9c69c2f102b8ff549fd7d Mon Sep 17 00:00:00 2001 From: kares Date: Wed, 30 Mar 2022 09:54:47 +0200 Subject: [PATCH 11/29] a note on the current limitation --- docs/index.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index 4db2112..c7b862c 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -140,6 +140,7 @@ SSL key passphrase `'TLSv1.1'` is not considered secure and is only provided for legacy applications. List of allowed SSL/TLS versions to use when establishing a secure connection. +The plugin has a limitation and can only enable a range of protocols, e.g. setting `['TLSv1.3', 'TLSv1.1']` does not disable `TLSv1.2`. For Java 8 `'TLSv1.3'` is supported only since **8u262** (AdoptOpenJDK), but requires that you set the `LS_JAVA_OPTS="-Djdk.tls.client.protocols=TLSv1.3"` system property in Logstash. From 3cca9ead374a67bcf7e25a9aff1ee57f957b0093 Mon Sep 17 00:00:00 2001 From: kares Date: Wed, 30 Mar 2022 10:03:19 +0200 Subject: [PATCH 12/29] Test: JSON dependency --- spec/outputs/tcp_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/outputs/tcp_spec.rb b/spec/outputs/tcp_spec.rb index 68e03bb..d4e6425 100644 --- a/spec/outputs/tcp_spec.rb +++ b/spec/outputs/tcp_spec.rb @@ -50,7 +50,7 @@ after { subject.close } - it 'receives serialized data' do + it 'receives serialized data' do; require 'json' client # connect Thread.start { sleep 0.5; subject.receive event } From d3912548768c8b62db5dbd01bacc05b7887e998e Mon Sep 17 00:00:00 2001 From: kares Date: Wed, 30 Mar 2022 11:10:29 +0200 Subject: [PATCH 13/29] Test: let's just skip the test on 6.x --- spec/outputs/tcp_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/outputs/tcp_spec.rb b/spec/outputs/tcp_spec.rb index d4e6425..a749340 100644 --- a/spec/outputs/tcp_spec.rb +++ b/spec/outputs/tcp_spec.rb @@ -191,7 +191,7 @@ expect { subject.receive event }.to throw_symbol(:TEST_DONE) end - end + end if LOGSTASH_VERSION > '7.0' end From dbdf66d67e1f4ff79313e56809bd84ba5b783356 Mon Sep 17 00:00:00 2001 From: kares Date: Mon, 18 Apr 2022 14:36:04 +0200 Subject: [PATCH 14/29] Revert test container detection hack --- logstash-output-tcp.gemspec | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/logstash-output-tcp.gemspec b/logstash-output-tcp.gemspec index a9572a0..d5aba7a 100644 --- a/logstash-output-tcp.gemspec +++ b/logstash-output-tcp.gemspec @@ -25,14 +25,8 @@ Gem::Specification.new do |s| s.add_runtime_dependency 'logstash-codec-json' s.add_runtime_dependency 'stud' - # NOTE: be able to test against LS 7.x / 8.x - # (some LS versions have strict jruby-openssl requirements in logstash-core e.g. "= 0.11.0") - if ENV['ELASTIC_CONTAINER'].eql?('true') && ENV['PWD'].eql?('/usr/share/plugins/plugin') - # detected running under CI - ENV['CI'] is not enough e.g. during `docker-compose build` - else - # we depend on bouncycastle's bcpkix-jdk15on being on the class-path - s.add_runtime_dependency 'jruby-openssl', '>= 0.12.2' # 0.12 supports TLSv1.3 - end + # we depend on bouncycastle's bcpkix-jdk15on being on the class-path + s.add_runtime_dependency 'jruby-openssl', '>= 0.12.2' # 0.12 supports TLSv1.3 s.add_development_dependency 'logstash-devutils' s.add_development_dependency 'logstash-codec-plain' From 37376574f016574e545e91a47ca65ccc11d59ae8 Mon Sep 17 00:00:00 2001 From: kares Date: Mon, 18 Apr 2022 14:54:51 +0200 Subject: [PATCH 15/29] bump + changelog --- CHANGELOG.md | 3 +++ logstash-output-tcp.gemspec | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 25f6ebb..b5375cd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 6.1.0 + - Feat: ssl_supported_protocols (TLSv1.3) [#47](https://github.com/logstash-plugins/logstash-output-tcp/pull/47) + ## 6.0.2 - Fix: unable to start with password protected key [#45](https://github.com/logstash-plugins/logstash-output-tcp/pull/45) diff --git a/logstash-output-tcp.gemspec b/logstash-output-tcp.gemspec index d5aba7a..7e0d8b8 100644 --- a/logstash-output-tcp.gemspec +++ b/logstash-output-tcp.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-output-tcp' - s.version = '6.0.2' + s.version = '6.1.0' s.licenses = ['Apache License (2.0)'] s.summary = "Writes events over a TCP socket" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" From b2210b65f9b13ab4df6158a92da79a3ee7e797f5 Mon Sep 17 00:00:00 2001 From: kares Date: Thu, 28 Apr 2022 16:14:03 +0200 Subject: [PATCH 16/29] require latest LS 8.1 due jruby-openssl pinning --- .travis.yml | 8 +++++++- logstash-output-tcp.gemspec | 3 ++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index a50fc73..acf7ac5 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,2 +1,8 @@ import: -- logstash-plugins/.ci:travis/travis.yml@1.x \ No newline at end of file + - logstash-plugins/.ci:travis/defaults.yml@1.x + - logstash-plugins/.ci:travis/exec.yml@1.x + +env: + jobs: + - ELASTIC_STACK_VERSION=8.x + - SNAPSHOT=true ELASTIC_STACK_VERSION=8.x \ No newline at end of file diff --git a/logstash-output-tcp.gemspec b/logstash-output-tcp.gemspec index 7e0d8b8..a02c1be 100644 --- a/logstash-output-tcp.gemspec +++ b/logstash-output-tcp.gemspec @@ -21,7 +21,8 @@ Gem::Specification.new do |s| # Gem dependencies s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99" - + s.add_runtime_dependency 'logstash-core', '>= 8.1.0' + s.add_runtime_dependency 'jruby-openssl', '>= 0.12.2' # 0.12 supports TLSv1.3 s.add_runtime_dependency 'logstash-codec-json' s.add_runtime_dependency 'stud' From 0c94ec0636261106e8bcbf403cca1f44eee956cd Mon Sep 17 00:00:00 2001 From: kares Date: Thu, 28 Apr 2022 16:16:39 +0200 Subject: [PATCH 17/29] redundant docs related to Java 8 --- docs/index.asciidoc | 3 --- 1 file changed, 3 deletions(-) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index c7b862c..0123e8a 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -142,9 +142,6 @@ SSL key passphrase List of allowed SSL/TLS versions to use when establishing a secure connection. The plugin has a limitation and can only enable a range of protocols, e.g. setting `['TLSv1.3', 'TLSv1.1']` does not disable `TLSv1.2`. -For Java 8 `'TLSv1.3'` is supported only since **8u262** (AdoptOpenJDK), but requires that you set the -`LS_JAVA_OPTS="-Djdk.tls.client.protocols=TLSv1.3"` system property in Logstash. - NOTE: If you configure the plugin to use `'TLSv1.1'` on any recent JVM, such as the one packaged with Logstash, the protocol is disabled by default and needs to be enabled manually by changing `jdk.tls.disabledAlgorithms` in the *$JDK_HOME/conf/security/java.security* configuration file. That is, `TLSv1.1` needs to be removed from the list. From 0d68ece9410c06a4a809a3076b81db66dd4f6fb3 Mon Sep 17 00:00:00 2001 From: kares Date: Thu, 28 Apr 2022 16:18:04 +0200 Subject: [PATCH 18/29] one only --- logstash-output-tcp.gemspec | 2 -- 1 file changed, 2 deletions(-) diff --git a/logstash-output-tcp.gemspec b/logstash-output-tcp.gemspec index a02c1be..14be8f9 100644 --- a/logstash-output-tcp.gemspec +++ b/logstash-output-tcp.gemspec @@ -22,11 +22,9 @@ Gem::Specification.new do |s| # Gem dependencies s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99" s.add_runtime_dependency 'logstash-core', '>= 8.1.0' - s.add_runtime_dependency 'jruby-openssl', '>= 0.12.2' # 0.12 supports TLSv1.3 s.add_runtime_dependency 'logstash-codec-json' s.add_runtime_dependency 'stud' - # we depend on bouncycastle's bcpkix-jdk15on being on the class-path s.add_runtime_dependency 'jruby-openssl', '>= 0.12.2' # 0.12 supports TLSv1.3 s.add_development_dependency 'logstash-devutils' From 53e138d91a0cad9c5edb34206bbd852f3792a58b Mon Sep 17 00:00:00 2001 From: Karol Bucek Date: Thu, 5 May 2022 13:02:33 +0200 Subject: [PATCH 19/29] Update lib/logstash/outputs/tcp.rb MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: João Duarte --- lib/logstash/outputs/tcp.rb | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/lib/logstash/outputs/tcp.rb b/lib/logstash/outputs/tcp.rb index 33a43c6..dbed489 100644 --- a/lib/logstash/outputs/tcp.rb +++ b/lib/logstash/outputs/tcp.rb @@ -78,11 +78,9 @@ def write(msg) end # def write def close - begin - @socket.close - rescue => e - log_warn 'socket close failed:', e, socket: (@socket ? @socket.to_s : nil) - end + @socket.close + rescue => e + log_warn 'socket close failed:', e, socket: (@socket ? @socket.to_s : nil) end end # class Client From d5b7dc3dd263f3c4460b8579f017ec8e41f51daa Mon Sep 17 00:00:00 2001 From: kares Date: Thu, 5 May 2022 13:55:20 +0200 Subject: [PATCH 20/29] remove limitation on setting only ranges with a necessary work-around for JOSSL --- docs/index.asciidoc | 1 - lib/logstash/outputs/tcp.rb | 18 ++++++++++------- spec/outputs/tcp_spec.rb | 39 ++++++++++++++++++++++++------------- 3 files changed, 37 insertions(+), 21 deletions(-) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index 0123e8a..b5fd844 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -140,7 +140,6 @@ SSL key passphrase `'TLSv1.1'` is not considered secure and is only provided for legacy applications. List of allowed SSL/TLS versions to use when establishing a secure connection. -The plugin has a limitation and can only enable a range of protocols, e.g. setting `['TLSv1.3', 'TLSv1.1']` does not disable `TLSv1.2`. NOTE: If you configure the plugin to use `'TLSv1.1'` on any recent JVM, such as the one packaged with Logstash, the protocol is disabled by default and needs to be enabled manually by changing `jdk.tls.disabledAlgorithms` in diff --git a/lib/logstash/outputs/tcp.rb b/lib/logstash/outputs/tcp.rb index dbed489..fb5030b 100644 --- a/lib/logstash/outputs/tcp.rb +++ b/lib/logstash/outputs/tcp.rb @@ -110,12 +110,18 @@ def setup_ssl @ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER|OpenSSL::SSL::VERIFY_FAIL_IF_NO_PEER_CERT end + @ssl_context.min_version = :TLS1_1 # not strictly required - JVM should have disabled TLSv1 if ssl_supported_protocols.any? - min_max_version = ssl_supported_protocols.sort.map { |n| n.sub('v', '').sub('.', '_').to_sym } # 'TLSv1.2' => :TLS1_2 - @ssl_context.min_version = min_max_version.first - @ssl_context.max_version = min_max_version.last + disabled_protocols = ['TLSv1.1', 'TLSv1.2', 'TLSv1.3'] - ssl_supported_protocols + unless OpenSSL::SSL.const_defined? :OP_NO_TLSv1_3 # work-around JRuby-OpenSSL bug - missing constant + @ssl_context.max_version = :TLS1_2 if disabled_protocols.delete('TLSv1.3') + end + # mapping 'TLSv1.2' -> OpenSSL::SSL::OP_NO_TLSv1_2 + disabled_protocols.map! { |v| OpenSSL::SSL.const_get "OP_NO_#{v.sub('.', '_')}" } + @ssl_context.options = disabled_protocols.inject(@ssl_context.options) { |options, op_no| options | op_no } end - end # def setup_ssl + @ssl_context + end private :setup_ssl # @note to be able to hook up into #ssl_context from tests @@ -128,9 +134,7 @@ def new_ssl_context def register require "socket" require "stud/try" - if @ssl_enable - setup_ssl - end # @ssl_enable + setup_ssl if @ssl_enable if server? @logger.info("Starting tcp output listener", :address => "#{@host}:#{@port}") diff --git a/spec/outputs/tcp_spec.rb b/spec/outputs/tcp_spec.rb index a749340..b559766 100644 --- a/spec/outputs/tcp_spec.rb +++ b/spec/outputs/tcp_spec.rb @@ -66,15 +66,20 @@ super().merge 'ssl_supported_protocols' => [ 'TLSv1.1' ] end - before do - ssl_context = OpenSSL::SSL::SSLContext.new - allow(subject).to receive(:new_ssl_context).and_return(ssl_context) - expect(ssl_context).to receive(:min_version=).with(:'TLS1_1').and_call_original - expect(ssl_context).to receive(:max_version=).with(:'TLS1_1').and_call_original - end - - it "sets min/max version" do - subject.send :setup_ssl + it "limits protocol selection" do + if OpenSSL::SSL.const_defined? :OP_NO_TLSv1_3 + ssl_context = subject.send :setup_ssl + expect(ssl_context.options & OpenSSL::SSL::OP_NO_TLSv1_3).to_not eql 0 + expect(ssl_context.options & OpenSSL::SSL::OP_NO_TLSv1_2).to_not eql 0 + expect(ssl_context.options & OpenSSL::SSL::OP_NO_TLSv1_1).to eql 0 + else + ssl_context = OpenSSL::SSL::SSLContext.new + allow(subject).to receive(:new_ssl_context).and_return(ssl_context) + expect(ssl_context).to receive(:max_version=).with(:'TLS1_2').and_call_original + ssl_context = subject.send :setup_ssl + expect(ssl_context.options & OpenSSL::SSL::OP_NO_TLSv1_2).to_not eql 0 + expect(ssl_context.options & OpenSSL::SSL::OP_NO_TLSv1_1).to eql 0 + end end end @@ -83,14 +88,22 @@ super().merge 'ssl_supported_protocols' => [ 'TLSv1.3', 'TLSv1.1', 'TLSv1.2' ] end - before do + it "does not limit protocol selection (except min_version)" do ssl_context = OpenSSL::SSL::SSLContext.new allow(subject).to receive(:new_ssl_context).and_return(ssl_context) expect(ssl_context).to receive(:min_version=).with(:'TLS1_1').and_call_original - expect(ssl_context).to receive(:max_version=).with(:'TLS1_3').and_call_original - end - it "sets min/max version" do + if OpenSSL::SSL.const_defined? :OP_NO_TLSv1_3 + ssl_context = subject.send :setup_ssl + expect(ssl_context.options & OpenSSL::SSL::OP_NO_TLSv1_3).to eql 0 + expect(ssl_context.options & OpenSSL::SSL::OP_NO_TLSv1_2).to eql 0 + expect(ssl_context.options & OpenSSL::SSL::OP_NO_TLSv1_1).to eql 0 + else + ssl_context = subject.send :setup_ssl + expect(ssl_context.options & OpenSSL::SSL::OP_NO_TLSv1_2).to eql 0 + expect(ssl_context.options & OpenSSL::SSL::OP_NO_TLSv1_1).to eql 0 + end + subject.send :setup_ssl end end From 1193e63e9cc6ad9935ac9f1a57d0fbad7a2ad782 Mon Sep 17 00:00:00 2001 From: kares Date: Thu, 5 May 2022 15:05:06 +0200 Subject: [PATCH 21/29] might happen multiple times --- spec/outputs/tcp_spec.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/spec/outputs/tcp_spec.rb b/spec/outputs/tcp_spec.rb index b559766..0708cbf 100644 --- a/spec/outputs/tcp_spec.rb +++ b/spec/outputs/tcp_spec.rb @@ -91,15 +91,15 @@ it "does not limit protocol selection (except min_version)" do ssl_context = OpenSSL::SSL::SSLContext.new allow(subject).to receive(:new_ssl_context).and_return(ssl_context) - expect(ssl_context).to receive(:min_version=).with(:'TLS1_1').and_call_original + expect(ssl_context).to receive(:min_version=).with(:'TLS1_1').at_least(1).and_call_original if OpenSSL::SSL.const_defined? :OP_NO_TLSv1_3 - ssl_context = subject.send :setup_ssl + subject.send :setup_ssl expect(ssl_context.options & OpenSSL::SSL::OP_NO_TLSv1_3).to eql 0 expect(ssl_context.options & OpenSSL::SSL::OP_NO_TLSv1_2).to eql 0 expect(ssl_context.options & OpenSSL::SSL::OP_NO_TLSv1_1).to eql 0 else - ssl_context = subject.send :setup_ssl + subject.send :setup_ssl expect(ssl_context.options & OpenSSL::SSL::OP_NO_TLSv1_2).to eql 0 expect(ssl_context.options & OpenSSL::SSL::OP_NO_TLSv1_1).to eql 0 end From c063e2e89bb6e926c2d8f1c07e99d5b761c90e8e Mon Sep 17 00:00:00 2001 From: kares Date: Thu, 5 May 2022 16:23:05 +0200 Subject: [PATCH 22/29] close em all! --- CHANGELOG.md | 1 + lib/logstash/outputs/tcp.rb | 5 +++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b5375cd..21aab5d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ ## 6.1.0 - Feat: ssl_supported_protocols (TLSv1.3) [#47](https://github.com/logstash-plugins/logstash-output-tcp/pull/47) + - Fix: close server and client sockets on plugin close ## 6.0.2 - Fix: unable to start with password protected key [#45](https://github.com/logstash-plugins/logstash-output-tcp/pull/45) diff --git a/lib/logstash/outputs/tcp.rb b/lib/logstash/outputs/tcp.rb index fb5030b..1c9b652 100644 --- a/lib/logstash/outputs/tcp.rb +++ b/lib/logstash/outputs/tcp.rb @@ -200,11 +200,12 @@ def receive(event) # @overload Base#close def close - return unless @client_threads + @server_socket.close rescue nil if @server_socket + return unless @client_threads @client_threads.each do |thread| client = thread[:client] - client.close if client + client.close rescue nil if client end end From 051ae4c401d91a0cbbc07f22263dbbe3a763e76d Mon Sep 17 00:00:00 2001 From: kares Date: Thu, 5 May 2022 16:28:30 +0200 Subject: [PATCH 23/29] Refactor: logging helpers to do ~ same --- lib/logstash/outputs/tcp.rb | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/lib/logstash/outputs/tcp.rb b/lib/logstash/outputs/tcp.rb index 1c9b652..eddd018 100644 --- a/lib/logstash/outputs/tcp.rb +++ b/lib/logstash/outputs/tcp.rb @@ -239,19 +239,15 @@ def server? @mode == "server" end # def server? - def log_warn(msg, e, details = {}) + def log_warn(msg, e, backtrace: @logger.debug?, **details) details = details.merge message: e.message, exception: e.class - if details[:backtrace] || (details[:backtrace].nil? && @logger.debug?) - details[:backtrace] = e.backtrace - end + details[:backtrace] = e.backtrace if backtrace @logger.warn(msg, details) end - def log_error(msg, e, backtrace: nil) - details = { message: e.message, exception: e.class } - if backtrace || (backtrace.nil? && @logger.info?) - details[:backtrace] = e.backtrace - end + def log_error(msg, e, backtrace: @logger.info?, **details) + details = details.merge message: e.message, exception: e.class + details[:backtrace] = e.backtrace if backtrace @logger.error(msg, details) end From 69115c23cd03f7eb1232c653c282c83567ed1e2c Mon Sep 17 00:00:00 2001 From: kares Date: Thu, 5 May 2022 16:57:15 +0200 Subject: [PATCH 24/29] (more) thread-safe closing of client threads --- lib/logstash/outputs/tcp.rb | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/lib/logstash/outputs/tcp.rb b/lib/logstash/outputs/tcp.rb index eddd018..4316be9 100644 --- a/lib/logstash/outputs/tcp.rb +++ b/lib/logstash/outputs/tcp.rb @@ -134,6 +134,7 @@ def new_ssl_context def register require "socket" require "stud/try" + @closed = Concurrent::AtomicBoolean.new(false) setup_ssl if @ssl_enable if server? @@ -147,10 +148,11 @@ def register if @ssl_enable @server_socket = OpenSSL::SSL::SSLServer.new(@server_socket, @ssl_context) end # @ssl_enable - @client_threads = [] + @client_threads = Concurrent::Array.new @accept_thread = Thread.new(@server_socket) do |server_socket| loop do + break if @closed.value Thread.start(server_socket.accept) do |client_socket| # monkeypatch a 'peer' method onto the socket. client_socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end } @@ -158,16 +160,15 @@ def register client = Client.new(client_socket, @logger) Thread.current[:client] = client @client_threads << Thread.current - client.run + client.run unless @closed.value end end end @codec.on_event do |event, payload| - @client_threads.each do |client_thread| + @client_threads.select(&:alive?).each do |client_thread| client_thread[:client].write(payload) end - @client_threads.reject! {|t| !t.alive? } end else client_socket = nil @@ -200,6 +201,7 @@ def receive(event) # @overload Base#close def close + @closed.make_true @server_socket.close rescue nil if @server_socket return unless @client_threads From b77567412628dc911b75966fb76db64bfb1e0f1d Mon Sep 17 00:00:00 2001 From: kares Date: Thu, 5 May 2022 17:31:24 +0200 Subject: [PATCH 25/29] nonblock accept - to not be waiting while closing --- lib/logstash/outputs/tcp.rb | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/lib/logstash/outputs/tcp.rb b/lib/logstash/outputs/tcp.rb index 4316be9..9965707 100644 --- a/lib/logstash/outputs/tcp.rb +++ b/lib/logstash/outputs/tcp.rb @@ -153,7 +153,12 @@ def register @accept_thread = Thread.new(@server_socket) do |server_socket| loop do break if @closed.value - Thread.start(server_socket.accept) do |client_socket| + client_socket = server_socket.accept_nonblock exception: false + if client_socket == :wait_readable + IO.select [ server_socket ] + next + end + Thread.start(client_socket) do |client_socket| # monkeypatch a 'peer' method onto the socket. client_socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end } @logger.debug("accepted connection", client: client_socket.peer, server: "#{@host}:#{@port}") From 7b4d61fb60a06297273393a7e0e4d9577538b980 Mon Sep 17 00:00:00 2001 From: kares Date: Thu, 5 May 2022 17:39:14 +0200 Subject: [PATCH 26/29] a bit of thead naming convention - yay! --- lib/logstash/outputs/tcp.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/logstash/outputs/tcp.rb b/lib/logstash/outputs/tcp.rb index 9965707..be644de 100644 --- a/lib/logstash/outputs/tcp.rb +++ b/lib/logstash/outputs/tcp.rb @@ -151,6 +151,7 @@ def register @client_threads = Concurrent::Array.new @accept_thread = Thread.new(@server_socket) do |server_socket| + LogStash::Util.set_thread_name("[#{id}]|output|tcp|server_accept") loop do break if @closed.value client_socket = server_socket.accept_nonblock exception: false @@ -164,6 +165,7 @@ def register @logger.debug("accepted connection", client: client_socket.peer, server: "#{@host}:#{@port}") client = Client.new(client_socket, @logger) Thread.current[:client] = client + LogStash::Util.set_thread_name("[#{id}]|output|tcp|client_socket-#{@client_threads.size}") @client_threads << Thread.current client.run unless @closed.value end From c4f162884f571e66a271294c926d93837734e4cd Mon Sep 17 00:00:00 2001 From: Karol Bucek Date: Mon, 9 May 2022 08:03:34 +0200 Subject: [PATCH 27/29] Update lib/logstash/outputs/tcp.rb MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: João Duarte --- lib/logstash/outputs/tcp.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/logstash/outputs/tcp.rb b/lib/logstash/outputs/tcp.rb index be644de..eee9cb4 100644 --- a/lib/logstash/outputs/tcp.rb +++ b/lib/logstash/outputs/tcp.rb @@ -118,7 +118,7 @@ def setup_ssl end # mapping 'TLSv1.2' -> OpenSSL::SSL::OP_NO_TLSv1_2 disabled_protocols.map! { |v| OpenSSL::SSL.const_get "OP_NO_#{v.sub('.', '_')}" } - @ssl_context.options = disabled_protocols.inject(@ssl_context.options) { |options, op_no| options | op_no } + @ssl_context.options = disabled_protocols.reduce(@ssl_context.options, :|) end @ssl_context end From 656a9af234f2e1ddfc451f73a406022b23a12438 Mon Sep 17 00:00:00 2001 From: kares Date: Mon, 9 May 2022 09:24:49 +0200 Subject: [PATCH 28/29] restore filtering out dead threads from i-var --- lib/logstash/outputs/tcp.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/logstash/outputs/tcp.rb b/lib/logstash/outputs/tcp.rb index eee9cb4..40cbeef 100644 --- a/lib/logstash/outputs/tcp.rb +++ b/lib/logstash/outputs/tcp.rb @@ -173,7 +173,8 @@ def register end @codec.on_event do |event, payload| - @client_threads.select(&:alive?).each do |client_thread| + @client_threads.select!(&:alive?) + @client_threads.each do |client_thread| client_thread[:client].write(payload) end end From 5481865a8cae78415e80fe756b25c4a96f4cdfa6 Mon Sep 17 00:00:00 2001 From: kares Date: Mon, 9 May 2022 09:49:21 +0200 Subject: [PATCH 29/29] Review: use pipeline_id when naming threads --- lib/logstash/outputs/tcp.rb | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/lib/logstash/outputs/tcp.rb b/lib/logstash/outputs/tcp.rb index 40cbeef..77cb1ab 100644 --- a/lib/logstash/outputs/tcp.rb +++ b/lib/logstash/outputs/tcp.rb @@ -151,7 +151,7 @@ def register @client_threads = Concurrent::Array.new @accept_thread = Thread.new(@server_socket) do |server_socket| - LogStash::Util.set_thread_name("[#{id}]|output|tcp|server_accept") + LogStash::Util.set_thread_name("[#{pipeline_id}]|output|tcp|server_accept") loop do break if @closed.value client_socket = server_socket.accept_nonblock exception: false @@ -165,7 +165,7 @@ def register @logger.debug("accepted connection", client: client_socket.peer, server: "#{@host}:#{@port}") client = Client.new(client_socket, @logger) Thread.current[:client] = client - LogStash::Util.set_thread_name("[#{id}]|output|tcp|client_socket-#{@client_threads.size}") + LogStash::Util.set_thread_name("[#{pipeline_id}]|output|tcp|client_socket-#{@client_threads.size}") @client_threads << Thread.current client.run unless @closed.value end @@ -249,6 +249,10 @@ def server? @mode == "server" end # def server? + def pipeline_id + execution_context.pipeline_id || 'main' + end + def log_warn(msg, e, backtrace: @logger.debug?, **details) details = details.merge message: e.message, exception: e.class details[:backtrace] = e.backtrace if backtrace