@@ -51,38 +51,43 @@ class LogStash::Outputs::Tcp < LogStash::Outputs::Base
5151 # SSL key passphrase
5252 config :ssl_key_passphrase , :validate => :password , :default => nil
5353
54+ # NOTE: the default setting [] uses SSL engine defaults
55+ config :ssl_supported_protocols , :validate => [ 'TLSv1.1' , 'TLSv1.2' , 'TLSv1.3' ] , :default => [ ] , :list => true
56+
5457 class Client
55- public
58+
5659 def initialize ( socket , logger )
5760 @socket = socket
5861 @logger = logger
5962 @queue = Queue . new
6063 end
6164
62- public
6365 def run
6466 loop do
6567 begin
6668 @socket . write ( @queue . pop )
6769 rescue => e
68- @logger . warn ( "tcp output exception" , :socket => @socket ,
69- :exception => e )
70+ log_warn 'socket write failed:' , e , socket : ( @socket ? @socket . to_s : nil )
7071 break
7172 end
7273 end
7374 end # def run
7475
75- public
7676 def write ( msg )
7777 @queue . push ( msg )
7878 end # def write
79+
80+ def close
81+ @socket . close
82+ rescue => e
83+ log_warn 'socket close failed:' , e , socket : ( @socket ? @socket . to_s : nil )
84+ end
7985 end # class Client
8086
81- private
8287 def setup_ssl
8388 require "openssl"
8489
85- @ssl_context = OpenSSL :: SSL :: SSLContext . new
90+ @ssl_context = new_ssl_context
8691 if @ssl_cert
8792 @ssl_context . cert = OpenSSL ::X509 ::Certificate . new ( File . read ( @ssl_cert ) )
8893 if @ssl_key
@@ -104,50 +109,74 @@ def setup_ssl
104109 @ssl_context . cert_store = @cert_store
105110 @ssl_context . verify_mode = OpenSSL ::SSL ::VERIFY_PEER |OpenSSL ::SSL ::VERIFY_FAIL_IF_NO_PEER_CERT
106111 end
107- end # def setup_ssl
108112
109- public
113+ @ssl_context . min_version = :TLS1_1 # not strictly required - JVM should have disabled TLSv1
114+ if ssl_supported_protocols . any?
115+ disabled_protocols = [ 'TLSv1.1' , 'TLSv1.2' , 'TLSv1.3' ] - ssl_supported_protocols
116+ unless OpenSSL ::SSL . const_defined? :OP_NO_TLSv1_3 # work-around JRuby-OpenSSL bug - missing constant
117+ @ssl_context . max_version = :TLS1_2 if disabled_protocols . delete ( 'TLSv1.3' )
118+ end
119+ # mapping 'TLSv1.2' -> OpenSSL::SSL::OP_NO_TLSv1_2
120+ disabled_protocols . map! { |v | OpenSSL ::SSL . const_get "OP_NO_#{ v . sub ( '.' , '_' ) } " }
121+ @ssl_context . options = disabled_protocols . reduce ( @ssl_context . options , :| )
122+ end
123+ @ssl_context
124+ end
125+ private :setup_ssl
126+
127+ # @note to be able to hook up into #ssl_context from tests
128+ def new_ssl_context
129+ OpenSSL ::SSL ::SSLContext . new
130+ end
131+ private :new_ssl_context
132+
133+ # @overload Base#register
110134 def register
111135 require "socket"
112136 require "stud/try"
113- if @ssl_enable
114- setup_ssl
115- end # @ssl_enable
137+ @closed = Concurrent ::AtomicBoolean . new ( false )
138+ setup_ssl if @ssl_enable
116139
117140 if server?
118141 @logger . info ( "Starting tcp output listener" , :address => "#{ @host } :#{ @port } " )
119142 begin
120143 @server_socket = TCPServer . new ( @host , @port )
121144 rescue Errno ::EADDRINUSE
122- @logger . error ( "Could not start TCP server: Address in use" ,
123- :host => @host , :port => @port )
145+ @logger . error ( "Could not start tcp server: Address in use" , host : @host , port : @port )
124146 raise
125147 end
126148 if @ssl_enable
127149 @server_socket = OpenSSL ::SSL ::SSLServer . new ( @server_socket , @ssl_context )
128150 end # @ssl_enable
129- @client_threads = [ ]
151+ @client_threads = Concurrent :: Array . new
130152
131153 @accept_thread = Thread . new ( @server_socket ) do |server_socket |
154+ LogStash ::Util . set_thread_name ( "[#{ pipeline_id } ]|output|tcp|server_accept" )
132155 loop do
133- Thread . start ( server_socket . accept ) do |client_socket |
156+ break if @closed . value
157+ client_socket = server_socket . accept_nonblock exception : false
158+ if client_socket == :wait_readable
159+ IO . select [ server_socket ]
160+ next
161+ end
162+ Thread . start ( client_socket ) do |client_socket |
134163 # monkeypatch a 'peer' method onto the socket.
135164 client_socket . instance_eval { class << self ; include ::LogStash ::Util ::SocketPeer end }
136- @logger . debug ( "Accepted connection" , :client => client_socket . peer ,
137- :server => "#{ @host } :#{ @port } " )
165+ @logger . debug ( "accepted connection" , client : client_socket . peer , server : "#{ @host } :#{ @port } " )
138166 client = Client . new ( client_socket , @logger )
139167 Thread . current [ :client ] = client
168+ LogStash ::Util . set_thread_name ( "[#{ pipeline_id } ]|output|tcp|client_socket-#{ @client_threads . size } " )
140169 @client_threads << Thread . current
141- client . run
170+ client . run unless @closed . value
142171 end
143172 end
144173 end
145174
146175 @codec . on_event do |event , payload |
176+ @client_threads . select! ( &:alive? )
147177 @client_threads . each do |client_thread |
148178 client_thread [ :client ] . write ( payload )
149179 end
150- @client_threads . reject! { |t | !t . alive? }
151180 end
152181 else
153182 client_socket = nil
@@ -163,18 +192,35 @@ def register
163192 # Now send the payload
164193 client_socket . syswrite ( payload ) if w . any?
165194 rescue => e
166- @logger . warn ( "tcp output exception" , :host => @host , :port => @port ,
167- :exception => e , :backtrace => e . backtrace )
195+ log_warn "client socket failed:" , e , host : @host , port : @port , socket : ( client_socket ? client_socket . to_s : nil )
168196 client_socket . close rescue nil
169197 client_socket = nil
170198 sleep @reconnect_interval
171199 retry
172200 end
173201 end
174202 end
175- end # def register
203+ end
204+
205+ # @overload Base#receive
206+ def receive ( event )
207+ @codec . encode ( event )
208+ end
209+
210+ # @overload Base#close
211+ def close
212+ @closed . make_true
213+ @server_socket . close rescue nil if @server_socket
214+
215+ return unless @client_threads
216+ @client_threads . each do |thread |
217+ client = thread [ :client ]
218+ client . close rescue nil if client
219+ end
220+ end
176221
177222 private
223+
178224 def connect
179225 begin
180226 client_socket = TCPSocket . new ( @host , @port )
@@ -183,29 +229,40 @@ def connect
183229 begin
184230 client_socket . connect
185231 rescue OpenSSL ::SSL ::SSLError => ssle
186- @logger . error ( "SSL Error" , :exception => ssle , : backtrace => ssle . backtrace )
232+ log_error 'connect ssl failure:' , ssle , backtrace : false
187233 # NOTE(mrichar1): Hack to prevent hammering peer
188234 sleep ( 5 )
189235 raise
190236 end
191237 end
192238 client_socket . instance_eval { class << self ; include ::LogStash ::Util ::SocketPeer end }
193- @logger . debug ( "Opened connection" , :client => " #{ client_socket . peer } " )
239+ @logger . debug ( "opened connection" , :client => client_socket . peer )
194240 return client_socket
195- rescue StandardError => e
196- @logger . error ( "Failed to connect: #{ e . message } " , :exception => e . class , :backtrace => e . backtrace )
241+ rescue => e
242+ log_error 'failed to connect:' , e
197243 sleep @reconnect_interval
198244 retry
199245 end
200246 end # def connect
201247
202- private
203248 def server?
204249 @mode == "server"
205250 end # def server?
206251
207- public
208- def receive ( event )
209- @codec . encode ( event )
210- end # def receive
252+ def pipeline_id
253+ execution_context . pipeline_id || 'main'
254+ end
255+
256+ def log_warn ( msg , e , backtrace : @logger . debug? , **details )
257+ details = details . merge message : e . message , exception : e . class
258+ details [ :backtrace ] = e . backtrace if backtrace
259+ @logger . warn ( msg , details )
260+ end
261+
262+ def log_error ( msg , e , backtrace : @logger . info? , **details )
263+ details = details . merge message : e . message , exception : e . class
264+ details [ :backtrace ] = e . backtrace if backtrace
265+ @logger . error ( msg , details )
266+ end
267+
211268end # class LogStash::Outputs::Tcp
0 commit comments