Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 4.0.0
- Make 'message_format' option obsolete
- Use new Logsash 2.4/5.0 APIs for working batchwise and with shared concurrency

## 3.0.2
- Relax constraint on logstash-core-plugin-api to >= 1.60 <= 2.99

Expand Down
77 changes: 47 additions & 30 deletions lib/logstash/outputs/file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
# }
# }
class LogStash::Outputs::File < LogStash::Outputs::Base
concurrency :shared

FIELD_REF = /%\{[^}]+\}/

config_name "file"
Expand All @@ -35,13 +37,7 @@ class LogStash::Outputs::File < LogStash::Outputs::Base
# E.g: `/%{myfield}/`, `/test-%{myfield}/` are not valid paths
config :path, :validate => :string, :required => true

# The format to use when writing events to the file. This value
# supports any string and can include `%{name}` and other dynamic
# strings.
#
# If this setting is omitted, the full json representation of the
# event will be written as a single line.
config :message_format, :validate => :string, :deprecated => "You can achieve the same behavior with the 'line' codec"
config :message_format, :validate => :string, :obsolete => "You can achieve the same behavior with the 'line' codec"

# Flush interval (in seconds) for flushing writes to log files.
# 0 will flush on every message.
Expand Down Expand Up @@ -76,10 +72,9 @@ class LogStash::Outputs::File < LogStash::Outputs::Base
def register
require "fileutils" # For mkdir_p

workers_not_supported

@files = {}

@io_mutex = Mutex.new

@path = File.expand_path(path)

validate_path
Expand All @@ -91,6 +86,7 @@ def register
end
@failure_path = File.join(@file_root, @filename_failure)


now = Time.now
@last_flush_cycle = now
@last_stale_cleanup_cycle = now
Expand All @@ -101,8 +97,6 @@ def register
@codec = LogStash::Plugin.lookup("codec", "line").new
@codec.format = @message_format
end

@codec.on_event(&method(:write_event))
end # def register

private
Expand All @@ -125,20 +119,37 @@ def root_directory
end

public
def receive(event)
@codec.encode(event)
close_stale_files
def multi_receive_encoded(events_and_encoded)
encoded_by_path = Hash.new {|h,k| h[k] = []}

events_and_encoded.each do |event,encoded|
file_output_path = event_path(event)
encoded_by_path[file_output_path] << encoded
end

@io_mutex.synchronize do
encoded_by_path.each do |path,chunks|
fd = open(path)
chunks.each {|chunk| fd.write(chunk) }
fd.flush
end

close_stale_files
end
end # def receive

public
def close
@logger.debug("Close: closing files")
@files.each do |path, fd|
begin
fd.close
@logger.debug("Closed file #{path}", :fd => fd)
rescue Exception => e
@logger.error("Exception while flushing and closing files.", :exception => e)
@io_mutex.synchronize do
@logger.debug("Close: closing files")

@files.each do |path, fd|
begin
fd.close
@logger.debug("Closed file #{path}", :fd => fd)
rescue Exception => e
@logger.error("Exception while flushing and closing files.", :exception => e)
end
end
end
end
Expand All @@ -150,7 +161,7 @@ def inside_file_root?(log_path)
end

private
def write_event(event, data)
def event_path(event)
file_output_path = generate_filepath(event)
if path_with_field_ref? && !inside_file_root?(file_output_path)
@logger.warn("File: the event tried to write outside the files root, writing the event to the failure file", :event => event, :filename => @failure_path)
Expand All @@ -159,10 +170,8 @@ def write_event(event, data)
file_output_path = @failure_path
end
@logger.debug("File, writing event to file.", :filename => file_output_path)
fd = open(file_output_path)
# TODO(sissel): Check if we should rotate the file.
fd.write(data)
flush(fd)

file_output_path
end

private
Expand Down Expand Up @@ -195,10 +204,12 @@ def flush(fd)
def flush_pending_files
return unless Time.now - @last_flush_cycle >= flush_interval
@logger.debug("Starting flush cycle")

@files.each do |path, fd|
@logger.debug("Flushing file", :path => path, :fd => fd)
fd.flush
end

@last_flush_cycle = Time.now
end

Expand All @@ -207,6 +218,7 @@ def flush_pending_files
def close_stale_files
now = Time.now
return unless now - @last_stale_cleanup_cycle >= @stale_cleanup_interval

@logger.info("Starting stale files cleanup cycle", :files => @files)
inactive_files = @files.select { |path, fd| not fd.active }
@logger.debug("%d stale files found" % inactive_files.count, :inactive_files => inactive_files)
Expand All @@ -222,7 +234,7 @@ def close_stale_files

private
def cached?(path)
@files.include?(path) && !@files[path].nil?
@files.include?(path) && !@files[path].nil?
end

private
Expand All @@ -234,16 +246,19 @@ def deleted?(path)
def open(path)
if !deleted?(path) && cached?(path)
return @files[path]
elsif deleted?(path)
end

if deleted?(path)
if @create_if_deleted
@logger.debug("Required path was deleted, creating the file again", :path => path)
@files.delete(path)
else
return @files[path] if cached?(path)
end
end
@logger.info("Opening file", :path => path)

@logger.info("Opening file", :path => path)

dir = File.dirname(path)
if !Dir.exist?(dir)
@logger.info("Creating directory", :directory => dir)
Expand All @@ -253,6 +268,7 @@ def open(path)
FileUtils.mkdir_p(dir)
end
end

# work around a bug opening fifos (bug JRUBY-6280)
stat = File.stat(path) rescue nil
if stat && stat.ftype == "fifo" && LogStash::Environment.jruby?
Expand Down Expand Up @@ -288,6 +304,7 @@ def flush
end
def method_missing(method_name, *args, &block)
if @io.respond_to?(method_name)

@io.send(method_name, *args, &block)
else
super
Expand Down
4 changes: 2 additions & 2 deletions logstash-output-file.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-output-file'
s.version = '3.0.2'
s.version = '4.0.0'
s.licenses = ['Apache License (2.0)']
s.summary = "This output will write events to files on disk"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand All @@ -20,7 +20,7 @@ Gem::Specification.new do |s|
s.metadata = { "logstash_plugin" => "true", "logstash_group" => "output" }

# Gem dependencies
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
s.add_runtime_dependency "logstash-core-plugin-api", ">= 2.0.0", "< 2.99"
s.add_runtime_dependency 'logstash-codec-json_lines'
s.add_runtime_dependency 'logstash-codec-line'

Expand Down
56 changes: 18 additions & 38 deletions spec/outputs/file_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@

describe "ship lots of events to a file gzipped" do
Stud::Temporary.file('logstash-spec-output-file') do |tmp_file|
event_count = 10000 + rand(500)
event_count = 100000 + rand(500)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did catch some locking bugs here by upping this count to be a longer test.


config <<-CONFIG
input {
Expand Down Expand Up @@ -125,13 +125,14 @@

10.times do |i|
event = LogStash::Event.new("event_id" => i)
output.receive(event)
output.multi_receive([event])
end
FileUtils.rm(temp_file)
10.times do |i|
event = LogStash::Event.new("event_id" => i+10)
output.receive(event)
output.multi_receive([event])
end

expect(FileTest.size(temp_file.path)).to be > 0
end

Expand All @@ -147,12 +148,12 @@

10.times do |i|
event = LogStash::Event.new("event_id" => i)
output.receive(event)
output.multi_receive([event])
end
FileUtils.rm(temp_file)
10.times do |i|
event = LogStash::Event.new("event_id" => i+10)
output.receive(event)
output.multi_receive([event])
end
expect(FileTest.exist?(temp_file.path)).to be_falsey
expect(FileTest.size(output.failure_path)).to be > 0
Expand Down Expand Up @@ -184,7 +185,7 @@

output = LogStash::Outputs::File.new(config)
output.register
output.receive(bad_event)
output.multi_receive([bad_event])

error_file = File.join(path, config["filename_failure"])

Expand All @@ -202,10 +203,10 @@
output.register

bad_event.set('error', encoded_once)
output.receive(bad_event)
output.multi_receive([bad_event])

bad_event.set('error', encoded_twice)
output.receive(bad_event)
output.multi_receive([bad_event])

expect(Dir.glob(File.join(path, "*")).size).to eq(2)
output.close
Expand All @@ -218,7 +219,7 @@
output.register

bad_event.set('error', '../..//test')
output.receive(bad_event)
output.multi_receive([bad_event])

expect(Dir.glob(File.join(path, "*")).size).to eq(1)
output.close
Expand All @@ -235,7 +236,7 @@
config = { "path" => "#{path}/%{error}" }
output = LogStash::Outputs::File.new(config)
output.register
output.receive(good_event)
output.multi_receive([good_event])

good_file = File.join(path, good_event.get('error'))
expect(File.exist?(good_file)).to eq(true)
Expand All @@ -254,7 +255,7 @@
config = { "path" => dynamic_path }
output = LogStash::Outputs::File.new(config)
output.register
output.receive(good_event)
output.multi_receive([good_event])

expect(File.exist?(expected_path)).to eq(true)
output.close
Expand All @@ -276,7 +277,7 @@

output = LogStash::Outputs::File.new(config)
output.register
output.receive(good_event)
output.multi_receive([good_event])

expect(File.exist?(expected_path)).to eq(true)
output.close
Expand All @@ -291,7 +292,7 @@
config = { "path" => "#{path}/%{error}" }
output = LogStash::Outputs::File.new(config)
output.register
output.receive(good_event)
output.multi_receive([good_event])

good_file = File.join(path, good_event.get('error'))
expect(File.exist?(good_file)).to eq(true)
Expand All @@ -310,7 +311,7 @@
config = { "path" => "#{path}/output.txt" }
output = LogStash::Outputs::File.new(config)
output.register
output.receive(good_event)
output.multi_receive([good_event])
good_file = File.join(path, 'output.txt')
expect(File.exist?(good_file)).to eq(true)
output.close #teardown first to allow reading the file
Expand All @@ -328,30 +329,9 @@

Stud::Temporary.directory do |path|
config = { "path" => "#{path}/output.txt" }
output = LogStash::Outputs::File.new(config)
output.codec = LogStash::Codecs::Line.new({ "format" => "Custom format: %{message}"})
output.register
output.receive(good_event)
good_file = File.join(path, 'output.txt')
expect(File.exist?(good_file)).to eq(true)
output.close #teardown first to allow reading the file
File.open(good_file) {|f|
line = f.readline
expect(line).to eq("Custom format: hello world\n")
}
end
end
end
context "when using deprecated message_format config" do
it 'falls back to line codec' do
good_event = LogStash::Event.new
good_event.set('message', 'hello world')

Stud::Temporary.directory do |path|
config = { "path" => "#{path}/output.txt", "message_format" => "Custom format: %{message}" }
output = LogStash::Outputs::File.new(config)
output = LogStash::Outputs::File.new(config.merge("codec" => LogStash::Codecs::Line.new({ "format" => "Custom format: %{message}"})))
output.register
output.receive(good_event)
output.multi_receive([good_event])
good_file = File.join(path, 'output.txt')
expect(File.exist?(good_file)).to eq(true)
output.close #teardown first to allow reading the file
Expand All @@ -375,7 +355,7 @@
}
output = LogStash::Outputs::File.new(config)
output.register
output.receive(good_event)
output.multi_receive([good_event])
good_file = File.join(path, 'is/nested/output.txt')
expect(File.exist?(good_file)).to eq(true)
expect(File.stat(good_file).mode.to_s(8)[-3..-1]).to eq('610')
Expand Down