Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 6.1.1
- Fixes an issue where payloads larger than a connection's current TCP window could be silently truncated [#49](https://github.com/logstash-plugins/logstash-output-tcp/pull/49)

## 6.1.0
- Feat: ssl_supported_protocols (TLSv1.3) [#47](https://github.com/logstash-plugins/logstash-output-tcp/pull/47)
- Fix: close server and client sockets on plugin close
Expand Down
161 changes: 92 additions & 69 deletions lib/logstash/outputs/tcp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,25 @@ class LogStash::Outputs::Tcp < LogStash::Outputs::Base

class Client

def initialize(socket, logger)
##
# @param socket [Socket]
# @param logger_context [#log_warn&#log_error]
def initialize(socket, logger_context)
@socket = socket
@logger = logger
@logger_context = logger_context
@queue = Queue.new
end

def run
loop do
begin
@socket.write(@queue.pop)
remaining_payload = @queue.pop
while remaining_payload && remaining_payload.bytesize > 0
written_bytes_size = @socket.write(remaining_payload)
remaining_payload = remaining_payload.byteslice(written_bytes_size..-1)
end
rescue => e
log_warn 'socket write failed:', e, socket: (@socket ? @socket.to_s : nil)
@logger_context.log_warn 'socket write failed:', e, socket: (@socket ? @socket.to_s : nil)
break
end
end
Expand All @@ -80,7 +87,7 @@ def write(msg)
def close
@socket.close
rescue => e
log_warn 'socket close failed:', e, socket: (@socket ? @socket.to_s : nil)
@logger_context.log_warn 'socket close failed:', e, socket: (@socket ? @socket.to_s : nil)
end
end # class Client

Expand Down Expand Up @@ -135,69 +142,85 @@ def register
require "socket"
require "stud/try"
@closed = Concurrent::AtomicBoolean.new(false)
@thread_no = Concurrent::AtomicFixnum.new(0)
setup_ssl if @ssl_enable

if server?
@logger.info("Starting tcp output listener", :address => "#{@host}:#{@port}")
begin
@server_socket = TCPServer.new(@host, @port)
rescue Errno::EADDRINUSE
@logger.error("Could not start tcp server: Address in use", host: @host, port: @port)
raise
end
if @ssl_enable
@server_socket = OpenSSL::SSL::SSLServer.new(@server_socket, @ssl_context)
end # @ssl_enable
@client_threads = Concurrent::Array.new

@accept_thread = Thread.new(@server_socket) do |server_socket|
LogStash::Util.set_thread_name("[#{pipeline_id}]|output|tcp|server_accept")
loop do
break if @closed.value
client_socket = server_socket.accept_nonblock exception: false
if client_socket == :wait_readable
IO.select [ server_socket ]
next
end
Thread.start(client_socket) do |client_socket|
# monkeypatch a 'peer' method onto the socket.
client_socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end }
@logger.debug("accepted connection", client: client_socket.peer, server: "#{@host}:#{@port}")
client = Client.new(client_socket, @logger)
Thread.current[:client] = client
LogStash::Util.set_thread_name("[#{pipeline_id}]|output|tcp|client_socket-#{@client_threads.size}")
@client_threads << Thread.current
client.run unless @closed.value
end
run_as_server
else
run_as_client
end
end

def run_as_server
@logger.info("Starting tcp output listener", :address => "#{@host}:#{@port}")
begin
@server_socket = TCPServer.new(@host, @port)
rescue Errno::EADDRINUSE
@logger.error("Could not start tcp server: Address in use", host: @host, port: @port)
raise
end
if @ssl_enable
@server_socket = OpenSSL::SSL::SSLServer.new(@server_socket, @ssl_context)
end # @ssl_enable
@client_threads = Concurrent::Array.new

@accept_thread = Thread.new(@server_socket) do |server_socket|
LogStash::Util.set_thread_name("[#{pipeline_id}]|output|tcp|server_accept")
loop do
break if @closed.value
client_socket = server_socket.accept_nonblock exception: false
if client_socket == :wait_readable
IO.select [ server_socket ]
next
end
Thread.start(client_socket) do |client_socket|
# monkeypatch a 'peer' method onto the socket.
client_socket.extend(::LogStash::Util::SocketPeer)
@logger.debug("accepted connection", client: client_socket.peer, server: "#{@host}:#{@port}")
client = Client.new(client_socket, self)
Thread.current[:client] = client
LogStash::Util.set_thread_name("[#{pipeline_id}]|output|tcp|client_socket-#{@thread_no.increment}")
@client_threads << Thread.current
client.run unless @closed.value
end
end
end

@codec.on_event do |event, payload|
@client_threads.select!(&:alive?)
@client_threads.each do |client_thread|
client_thread[:client].write(payload)
end
@codec.on_event do |event, payload|
@client_threads.select!(&:alive?)
@client_threads.each do |client_thread|
client_thread[:client].write(payload)
end
else
client_socket = nil
@codec.on_event do |event, payload|
begin
client_socket = connect unless client_socket
r,w,e = IO.select([client_socket], [client_socket], [client_socket], nil)
end
end

def run_as_client
client_socket = nil
@codec.on_event do |event, payload|
begin
client_socket = connect unless client_socket

writable_io = nil
while writable_io.nil? || writable_io.any? == false
readable_io, writable_io, _ = IO.select([client_socket],[client_socket])

# don't expect any reads, but a readable socket might
# mean the remote end closed, so read it and throw it away.
# we'll get an EOFError if it happens.
client_socket.sysread(16384) if r.any?
readable_io.each { |readable| readable.sysread(16384) }
end

# Now send the payload
client_socket.syswrite(payload) if w.any?
rescue => e
log_warn "client socket failed:", e, host: @host, port: @port, socket: (client_socket ? client_socket.to_s : nil)
client_socket.close rescue nil
client_socket = nil
sleep @reconnect_interval
retry
while payload && payload.bytesize > 0
written_bytes_size = client_socket.syswrite(payload)
payload = payload.byteslice(written_bytes_size..-1)
end
rescue => e
log_warn "client socket failed:", e, host: @host, port: @port, socket: (client_socket ? client_socket.to_s : nil)
client_socket.close rescue nil
client_socket = nil
sleep @reconnect_interval
retry
end
end
end
Expand All @@ -219,6 +242,18 @@ def close
end
end

def log_warn(msg, e, backtrace: @logger.debug?, **details)
details = details.merge message: e.message, exception: e.class
details[:backtrace] = e.backtrace if backtrace
@logger.warn(msg, details)
end

def log_error(msg, e, backtrace: @logger.info?, **details)
details = details.merge message: e.message, exception: e.class
details[:backtrace] = e.backtrace if backtrace
@logger.error(msg, details)
end

private

def connect
Expand All @@ -235,7 +270,7 @@ def connect
raise
end
end
client_socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end }
client_socket.extend(::LogStash::Util::SocketPeer)
@logger.debug("opened connection", :client => client_socket.peer)
return client_socket
rescue => e
Expand All @@ -253,16 +288,4 @@ def pipeline_id
execution_context.pipeline_id || 'main'
end

def log_warn(msg, e, backtrace: @logger.debug?, **details)
details = details.merge message: e.message, exception: e.class
details[:backtrace] = e.backtrace if backtrace
@logger.warn(msg, details)
end

def log_error(msg, e, backtrace: @logger.info?, **details)
details = details.merge message: e.message, exception: e.class
details[:backtrace] = e.backtrace if backtrace
@logger.error(msg, details)
end

end # class LogStash::Outputs::Tcp
2 changes: 1 addition & 1 deletion logstash-output-tcp.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-output-tcp'
s.version = '6.1.0'
s.version = '6.1.1'
s.licenses = ['Apache License (2.0)']
s.summary = "Writes events over a TCP socket"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down