Skip to content

Commit 6ab2a13

Browse files
committed
Fix race conditions in #open
1 parent d90197a commit 6ab2a13

File tree

2 files changed

+68
-106
lines changed

2 files changed

+68
-106
lines changed

lib/logstash/outputs/file.rb

Lines changed: 48 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,7 @@ class LogStash::Outputs::File < LogStash::Outputs::Base
3939
# E.g: `/%{myfield}/`, `/test-%{myfield}/` are not valid paths
4040
config :path, :validate => :string, :required => true
4141

42-
# The format to use when writing events to the file. This value
43-
# supports any string and can include `%{name}` and other dynamic
44-
# strings.
45-
#
46-
# If this setting is omitted, the full json representation of the
47-
# event will be written as a single line.
48-
config :message_format, :validate => :string, :deprecated => "You can achieve the same behavior with the 'line' codec"
42+
config :message_format, :validate => :string, :obsolete => "You can achieve the same behavior with the 'line' codec"
4943

5044
# Flush interval (in seconds) for flushing writes to log files.
5145
# 0 will flush on every message.
@@ -81,10 +75,8 @@ def register
8175
require "fileutils" # For mkdir_p
8276

8377
@files = {}
84-
@files_lock = java.util.concurrent.locks.ReentrantReadWriteLock.new
85-
@files_read_lock = @files_lock.readLock
86-
@files_write_lock = @files_lock.writeLock
87-
78+
@io_mutex = Mutex.new
79+
8880
@path = File.expand_path(path)
8981

9082
validate_path
@@ -131,33 +123,28 @@ def root_directory
131123
public
132124
def multi_receive_encoded(events_and_encoded)
133125
encoded_by_path = Hash.new {|h,k| h[k] = []}
134-
126+
135127
events_and_encoded.each do |event,encoded|
136-
file_output_path = write_event(event)
128+
file_output_path = event_path(event)
137129
encoded_by_path[file_output_path] << encoded
138130
end
139131

140-
encoded_by_path.each do |path,chunks|
141-
fd = open(path)
142-
chunks.each(&fd.method(:write))
143-
flush(fd)
144-
end
145-
146-
close_stale_files
132+
@io_mutex.synchronize do
133+
encoded_by_path.each do |path,chunks|
134+
fd = open(path)
135+
chunks.each {|chunk| fd.write(chunk) }
136+
fd.flush
137+
end
138+
139+
close_stale_files
140+
end
147141
end # def receive
148142

149-
def with_lock(lock)
150-
lock.lock
151-
yield
152-
ensure
153-
lock.unlock
154-
end
155-
156143
public
157144
def close
158-
@logger.debug("Close: closing files")
159-
160-
with_lock(@files_read_lock) do
145+
@io_mutex.synchronize do
146+
@logger.debug("Close: closing files")
147+
161148
@files.each do |path, fd|
162149
begin
163150
fd.close
@@ -176,7 +163,7 @@ def inside_file_root?(log_path)
176163
end
177164

178165
private
179-
def write_event(event)
166+
def event_path(event)
180167
file_output_path = generate_filepath(event)
181168
if path_with_field_ref? && !inside_file_root?(file_output_path)
182169
@logger.warn("File: the event tried to write outside the files root, writing the event to the failure file", :event => event, :filename => @failure_path)
@@ -185,7 +172,6 @@ def write_event(event)
185172
file_output_path = @failure_path
186173
end
187174
@logger.debug("File, writing event to file.", :filename => file_output_path)
188-
189175

190176
file_output_path
191177
end
@@ -221,11 +207,9 @@ def flush_pending_files
221207
return unless Time.now - @last_flush_cycle >= flush_interval
222208
@logger.debug("Starting flush cycle")
223209

224-
with_lock(@files_read_lock) do
225-
@files.each do |path, fd|
226-
@logger.debug("Flushing file", :path => path, :fd => fd)
227-
fd.flush
228-
end
210+
@files.each do |path, fd|
211+
@logger.debug("Flushing file", :path => path, :fd => fd)
212+
fd.flush
229213
end
230214

231215
@last_flush_cycle = Time.now
@@ -237,24 +221,22 @@ def close_stale_files
237221
now = Time.now
238222
return unless now - @last_stale_cleanup_cycle >= @stale_cleanup_interval
239223

240-
with_lock(@files_write_lock) do
241-
@logger.info("Starting stale files cleanup cycle", :files => @files)
242-
inactive_files = @files.select { |path, fd| not fd.active }
243-
@logger.debug("%d stale files found" % inactive_files.count, :inactive_files => inactive_files)
244-
inactive_files.each do |path, fd|
245-
@logger.info("Closing file %s" % path)
246-
fd.close
247-
@files.delete(path)
248-
end
249-
# mark all files as inactive, a call to write will mark them as active again
250-
@files.each { |path, fd| fd.active = false }
251-
@last_stale_cleanup_cycle = now
224+
@logger.info("Starting stale files cleanup cycle", :files => @files)
225+
inactive_files = @files.select { |path, fd| not fd.active }
226+
@logger.debug("%d stale files found" % inactive_files.count, :inactive_files => inactive_files)
227+
inactive_files.each do |path, fd|
228+
@logger.info("Closing file %s" % path)
229+
fd.close
230+
@files.delete(path)
252231
end
232+
# mark all files as inactive, a call to write will mark them as active again
233+
@files.each { |path, fd| fd.active = false }
234+
@last_stale_cleanup_cycle = now
253235
end
254236

255237
private
256238
def cached?(path)
257-
with_lock(@files_read_lock) { @files.include?(path) && !@files[path].nil? }
239+
@files.include?(path) && !@files[path].nil?
258240
end
259241

260242
private
@@ -265,19 +247,20 @@ def deleted?(path)
265247
private
266248
def open(path)
267249
if !deleted?(path) && cached?(path)
268-
with_lock(@files_read_lock) { return @files[path] }
269-
elsif deleted?(path)
270-
with_lock(@files_write_lock) do
271-
if @create_if_deleted
272-
@logger.debug("Required path was deleted, creating the file again", :path => path)
273-
@files.delete(path)
274-
else
275-
return @files[path] if cached?(path)
276-
end
250+
return @files[path]
251+
end
252+
253+
if deleted?(path)
254+
if @create_if_deleted
255+
@logger.debug("Required path was deleted, creating the file again", :path => path)
256+
@files.delete(path)
257+
else
258+
return @files[path] if cached?(path)
277259
end
278260
end
279-
@logger.info("Opening file", :path => path)
280261

262+
@logger.info("Opening file", :path => path)
263+
281264
dir = File.dirname(path)
282265
if !Dir.exist?(dir)
283266
@logger.info("Creating directory", :directory => dir)
@@ -287,6 +270,7 @@ def open(path)
287270
FileUtils.mkdir_p(dir)
288271
end
289272
end
273+
290274
# work around a bug opening fifos (bug JRUBY-6280)
291275
stat = File.stat(path) rescue nil
292276
if stat && stat.ftype == "fifo" && LogStash::Environment.jruby?
@@ -301,21 +285,18 @@ def open(path)
301285
if gzip
302286
fd = Zlib::GzipWriter.new(fd)
303287
end
304-
with_lock(@files_write_lock) { @files[path] = IOWriter.new(fd) }
288+
@files[path] = IOWriter.new(fd)
305289
end
306290
end # class LogStash::Outputs::File
307291

308292
# wrapper class
309293
class IOWriter
310294
def initialize(io)
311295
@io = io
312-
@write_mutex = Mutex.new
313296
end
314297
def write(string)
315-
@write_mutex.synchronize do
316-
@io.write(string)
317-
@active = true
318-
end
298+
@io.write(string)
299+
@active = true
319300
end
320301
def flush
321302
@io.flush
@@ -325,6 +306,7 @@ def flush
325306
end
326307
def method_missing(method_name, *args, &block)
327308
if @io.respond_to?(method_name)
309+
328310
@io.send(method_name, *args, &block)
329311
else
330312
super

spec/outputs/file_spec.rb

Lines changed: 20 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747

4848
describe "ship lots of events to a file gzipped" do
4949
Stud::Temporary.file('logstash-spec-output-file') do |tmp_file|
50-
event_count = 10000 + rand(500)
50+
event_count = 100000 + rand(500)
5151

5252
config <<-CONFIG
5353
input {
@@ -119,19 +119,20 @@
119119
{ "path" => temp_file.path, "flush_interval" => 0 }
120120
end
121121

122-
it "should recreate the required file if deleted" do
122+
it "should recreate the required file if deleted" do
123123
output = LogStash::Outputs::File.new(config)
124124
output.register
125125

126126
10.times do |i|
127127
event = LogStash::Event.new("event_id" => i)
128-
output.receive(event)
128+
output.multi_receive([event])
129129
end
130130
FileUtils.rm(temp_file)
131-
10.times do |i|
131+
10.times do |i|
132132
event = LogStash::Event.new("event_id" => i+10)
133-
output.receive(event)
133+
output.multi_receive([event])
134134
end
135+
135136
expect(FileTest.size(temp_file.path)).to be > 0
136137
end
137138

@@ -147,12 +148,12 @@
147148

148149
10.times do |i|
149150
event = LogStash::Event.new("event_id" => i)
150-
output.receive(event)
151+
output.multi_receive([event])
151152
end
152153
FileUtils.rm(temp_file)
153154
10.times do |i|
154155
event = LogStash::Event.new("event_id" => i+10)
155-
output.receive(event)
156+
output.multi_receive([event])
156157
end
157158
expect(FileTest.exist?(temp_file.path)).to be_falsey
158159
expect(FileTest.size(output.failure_path)).to be > 0
@@ -184,7 +185,7 @@
184185

185186
output = LogStash::Outputs::File.new(config)
186187
output.register
187-
output.receive(bad_event)
188+
output.multi_receive([bad_event])
188189

189190
error_file = File.join(path, config["filename_failure"])
190191

@@ -202,10 +203,10 @@
202203
output.register
203204

204205
bad_event.set('error', encoded_once)
205-
output.receive(bad_event)
206+
output.multi_receive([bad_event])
206207

207208
bad_event.set('error', encoded_twice)
208-
output.receive(bad_event)
209+
output.multi_receive([bad_event])
209210

210211
expect(Dir.glob(File.join(path, "*")).size).to eq(2)
211212
output.close
@@ -218,7 +219,7 @@
218219
output.register
219220

220221
bad_event.set('error', '../..//test')
221-
output.receive(bad_event)
222+
output.multi_receive([bad_event])
222223

223224
expect(Dir.glob(File.join(path, "*")).size).to eq(1)
224225
output.close
@@ -235,7 +236,7 @@
235236
config = { "path" => "#{path}/%{error}" }
236237
output = LogStash::Outputs::File.new(config)
237238
output.register
238-
output.receive(good_event)
239+
output.multi_receive([good_event])
239240

240241
good_file = File.join(path, good_event.get('error'))
241242
expect(File.exist?(good_file)).to eq(true)
@@ -254,7 +255,7 @@
254255
config = { "path" => dynamic_path }
255256
output = LogStash::Outputs::File.new(config)
256257
output.register
257-
output.receive(good_event)
258+
output.multi_receive([good_event])
258259

259260
expect(File.exist?(expected_path)).to eq(true)
260261
output.close
@@ -276,7 +277,7 @@
276277

277278
output = LogStash::Outputs::File.new(config)
278279
output.register
279-
output.receive(good_event)
280+
output.multi_receive([good_event])
280281

281282
expect(File.exist?(expected_path)).to eq(true)
282283
output.close
@@ -291,7 +292,7 @@
291292
config = { "path" => "#{path}/%{error}" }
292293
output = LogStash::Outputs::File.new(config)
293294
output.register
294-
output.receive(good_event)
295+
output.multi_receive([good_event])
295296

296297
good_file = File.join(path, good_event.get('error'))
297298
expect(File.exist?(good_file)).to eq(true)
@@ -310,7 +311,7 @@
310311
config = { "path" => "#{path}/output.txt" }
311312
output = LogStash::Outputs::File.new(config)
312313
output.register
313-
output.receive(good_event)
314+
output.multi_receive([good_event])
314315
good_file = File.join(path, 'output.txt')
315316
expect(File.exist?(good_file)).to eq(true)
316317
output.close #teardown first to allow reading the file
@@ -328,30 +329,9 @@
328329

329330
Stud::Temporary.directory do |path|
330331
config = { "path" => "#{path}/output.txt" }
331-
output = LogStash::Outputs::File.new(config)
332-
output.codec = LogStash::Codecs::Line.new({ "format" => "Custom format: %{message}"})
333-
output.register
334-
output.receive(good_event)
335-
good_file = File.join(path, 'output.txt')
336-
expect(File.exist?(good_file)).to eq(true)
337-
output.close #teardown first to allow reading the file
338-
File.open(good_file) {|f|
339-
line = f.readline
340-
expect(line).to eq("Custom format: hello world\n")
341-
}
342-
end
343-
end
344-
end
345-
context "when using deprecated message_format config" do
346-
it 'falls back to line codec' do
347-
good_event = LogStash::Event.new
348-
good_event.set('message', 'hello world')
349-
350-
Stud::Temporary.directory do |path|
351-
config = { "path" => "#{path}/output.txt", "message_format" => "Custom format: %{message}" }
352-
output = LogStash::Outputs::File.new(config)
332+
output = LogStash::Outputs::File.new(config.merge("codec" => LogStash::Codecs::Line.new({ "format" => "Custom format: %{message}"})))
353333
output.register
354-
output.receive(good_event)
334+
output.multi_receive([good_event])
355335
good_file = File.join(path, 'output.txt')
356336
expect(File.exist?(good_file)).to eq(true)
357337
output.close #teardown first to allow reading the file
@@ -375,7 +355,7 @@
375355
}
376356
output = LogStash::Outputs::File.new(config)
377357
output.register
378-
output.receive(good_event)
358+
output.multi_receive([good_event])
379359
good_file = File.join(path, 'is/nested/output.txt')
380360
expect(File.exist?(good_file)).to eq(true)
381361
expect(File.stat(good_file).mode.to_s(8)[-3..-1]).to eq('610')

0 commit comments

Comments
 (0)