Skip to content

Commit d90197a

Browse files
committed
Use concurrency :shared and sync codecs to optimize performance
This provides a nice boost: Before: ``` time bin/logstash -e "input { generator { count => 3000000} } filter { } output { file { path => '/tmp/newfileout'} }" Settings: Default pipeline workers: 8 Pipeline main started Pipeline main has been shutdown stopping pipeline {:id=>"main"} 139.95 real 223.61 user 28.93 sys ``` After ``` rm /tmp/newfileout; time bin/logstash -e "input { generator { count => 3000000} } filter { } output { file { codec => json_lines path => '/tmp/newfileout'} }" ; ls -lh /tmp/newfileout Settings: Default pipeline workers: 8 Pipeline main started Pipeline main has been shutdown stopping pipeline {:id=>"main"} 56.12 real 192.99 user 17.38 sys ```
1 parent 2cf8878 commit d90197a

File tree

1 file changed

+78
-41
lines changed

1 file changed

+78
-41
lines changed

lib/logstash/outputs/file.rb

Lines changed: 78 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
require "logstash/outputs/base"
44
require "logstash/errors"
55
require "zlib"
6+
java_import java.util.concurrent.locks.ReentrantReadWriteLock
7+
java_import java.lang.StringBuilder
68

79
# This output writes events to files on disk. You can use fields
810
# from the event as parts of the filename and/or path.
@@ -17,6 +19,8 @@
1719
# }
1820
# }
1921
class LogStash::Outputs::File < LogStash::Outputs::Base
22+
concurrency :shared
23+
2024
FIELD_REF = /%\{[^}]+\}/
2125

2226
config_name "file"
@@ -76,9 +80,10 @@ class LogStash::Outputs::File < LogStash::Outputs::Base
7680
def register
7781
require "fileutils" # For mkdir_p
7882

79-
workers_not_supported
80-
8183
@files = {}
84+
@files_lock = java.util.concurrent.locks.ReentrantReadWriteLock.new
85+
@files_read_lock = @files_lock.readLock
86+
@files_write_lock = @files_lock.writeLock
8287

8388
@path = File.expand_path(path)
8489

@@ -91,6 +96,7 @@ def register
9196
end
9297
@failure_path = File.join(@file_root, @filename_failure)
9398

99+
94100
now = Time.now
95101
@last_flush_cycle = now
96102
@last_stale_cleanup_cycle = now
@@ -101,8 +107,6 @@ def register
101107
@codec = LogStash::Plugin.lookup("codec", "line").new
102108
@codec.format = @message_format
103109
end
104-
105-
@codec.on_event(&method(:write_event))
106110
end # def register
107111

108112
private
@@ -125,20 +129,42 @@ def root_directory
125129
end
126130

127131
public
128-
def receive(event)
129-
@codec.encode(event)
132+
def multi_receive_encoded(events_and_encoded)
133+
encoded_by_path = Hash.new {|h,k| h[k] = []}
134+
135+
events_and_encoded.each do |event,encoded|
136+
file_output_path = write_event(event)
137+
encoded_by_path[file_output_path] << encoded
138+
end
139+
140+
encoded_by_path.each do |path,chunks|
141+
fd = open(path)
142+
chunks.each(&fd.method(:write))
143+
flush(fd)
144+
end
145+
130146
close_stale_files
131147
end # def receive
132148

149+
def with_lock(lock)
150+
lock.lock
151+
yield
152+
ensure
153+
lock.unlock
154+
end
155+
133156
public
134157
def close
135158
@logger.debug("Close: closing files")
136-
@files.each do |path, fd|
137-
begin
138-
fd.close
139-
@logger.debug("Closed file #{path}", :fd => fd)
140-
rescue Exception => e
141-
@logger.error("Exception while flushing and closing files.", :exception => e)
159+
160+
with_lock(@files_read_lock) do
161+
@files.each do |path, fd|
162+
begin
163+
fd.close
164+
@logger.debug("Closed file #{path}", :fd => fd)
165+
rescue Exception => e
166+
@logger.error("Exception while flushing and closing files.", :exception => e)
167+
end
142168
end
143169
end
144170
end
@@ -150,7 +176,7 @@ def inside_file_root?(log_path)
150176
end
151177

152178
private
153-
def write_event(event, data)
179+
def write_event(event)
154180
file_output_path = generate_filepath(event)
155181
if path_with_field_ref? && !inside_file_root?(file_output_path)
156182
@logger.warn("File: the event tried to write outside the files root, writing the event to the failure file", :event => event, :filename => @failure_path)
@@ -159,10 +185,9 @@ def write_event(event, data)
159185
file_output_path = @failure_path
160186
end
161187
@logger.debug("File, writing event to file.", :filename => file_output_path)
162-
fd = open(file_output_path)
163-
# TODO(sissel): Check if we should rotate the file.
164-
fd.write(data)
165-
flush(fd)
188+
189+
190+
file_output_path
166191
end
167192

168193
private
@@ -195,10 +220,14 @@ def flush(fd)
195220
def flush_pending_files
196221
return unless Time.now - @last_flush_cycle >= flush_interval
197222
@logger.debug("Starting flush cycle")
198-
@files.each do |path, fd|
199-
@logger.debug("Flushing file", :path => path, :fd => fd)
200-
fd.flush
223+
224+
with_lock(@files_read_lock) do
225+
@files.each do |path, fd|
226+
@logger.debug("Flushing file", :path => path, :fd => fd)
227+
fd.flush
228+
end
201229
end
230+
202231
@last_flush_cycle = Time.now
203232
end
204233

@@ -207,22 +236,25 @@ def flush_pending_files
207236
def close_stale_files
208237
now = Time.now
209238
return unless now - @last_stale_cleanup_cycle >= @stale_cleanup_interval
210-
@logger.info("Starting stale files cleanup cycle", :files => @files)
211-
inactive_files = @files.select { |path, fd| not fd.active }
212-
@logger.debug("%d stale files found" % inactive_files.count, :inactive_files => inactive_files)
213-
inactive_files.each do |path, fd|
214-
@logger.info("Closing file %s" % path)
215-
fd.close
216-
@files.delete(path)
239+
240+
with_lock(@files_write_lock) do
241+
@logger.info("Starting stale files cleanup cycle", :files => @files)
242+
inactive_files = @files.select { |path, fd| not fd.active }
243+
@logger.debug("%d stale files found" % inactive_files.count, :inactive_files => inactive_files)
244+
inactive_files.each do |path, fd|
245+
@logger.info("Closing file %s" % path)
246+
fd.close
247+
@files.delete(path)
248+
end
249+
# mark all files as inactive, a call to write will mark them as active again
250+
@files.each { |path, fd| fd.active = false }
251+
@last_stale_cleanup_cycle = now
217252
end
218-
# mark all files as inactive, a call to write will mark them as active again
219-
@files.each { |path, fd| fd.active = false }
220-
@last_stale_cleanup_cycle = now
221253
end
222254

223255
private
224256
def cached?(path)
225-
@files.include?(path) && !@files[path].nil?
257+
with_lock(@files_read_lock) { @files.include?(path) && !@files[path].nil? }
226258
end
227259

228260
private
@@ -233,13 +265,15 @@ def deleted?(path)
233265
private
234266
def open(path)
235267
if !deleted?(path) && cached?(path)
236-
return @files[path]
268+
with_lock(@files_read_lock) { return @files[path] }
237269
elsif deleted?(path)
238-
if @create_if_deleted
239-
@logger.debug("Required path was deleted, creating the file again", :path => path)
240-
@files.delete(path)
241-
else
242-
return @files[path] if cached?(path)
270+
with_lock(@files_write_lock) do
271+
if @create_if_deleted
272+
@logger.debug("Required path was deleted, creating the file again", :path => path)
273+
@files.delete(path)
274+
else
275+
return @files[path] if cached?(path)
276+
end
243277
end
244278
end
245279
@logger.info("Opening file", :path => path)
@@ -267,18 +301,21 @@ def open(path)
267301
if gzip
268302
fd = Zlib::GzipWriter.new(fd)
269303
end
270-
@files[path] = IOWriter.new(fd)
304+
with_lock(@files_write_lock) { @files[path] = IOWriter.new(fd) }
271305
end
272306
end # class LogStash::Outputs::File
273307

274308
# wrapper class
275309
class IOWriter
276310
def initialize(io)
277311
@io = io
312+
@write_mutex = Mutex.new
278313
end
279-
def write(*args)
280-
@io.write(*args)
281-
@active = true
314+
def write(string)
315+
@write_mutex.synchronize do
316+
@io.write(string)
317+
@active = true
318+
end
282319
end
283320
def flush
284321
@io.flush

0 commit comments

Comments
 (0)