Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ Metrics/AbcSize:
Metrics/ClassLength:
Max: 1300

# Offense count: 2
# Configuration parameters: CountComments.
Metrics/LineLength:
Max: 100

# Offense count: 1
# Configuration parameters: CountComments.
Metrics/ModuleLength:
Expand Down
2 changes: 1 addition & 1 deletion README.rdoc
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ The plugin supports the following parameters:
forwarded. If not set, incomplete exceptions stacks
are not flushed.

[force_line_breaks] Force line breaks between each lines when comibining exception stacks.
[force_line_breaks] Force line breaks between each lines when combining exception stacks.
This is useful if your exception is formatted
as a single line. i.e., logs retrieved from the docker's
logging driver don't have any line break.
Expand Down
2 changes: 1 addition & 1 deletion fluent-plugin-detect-exceptions.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Gem::Specification.new do |gem|
gem.test_files = gem.files.grep(/^(test)/)
gem.require_paths = ['lib']

gem.add_runtime_dependency 'fluentd', '>= 0.10'
gem.add_runtime_dependency 'fluentd', '>= 1.0', '< 2'

gem.add_development_dependency 'flexmock', '~> 2.0'
gem.add_development_dependency 'rake', '~> 10.3'
Expand Down
17 changes: 9 additions & 8 deletions lib/fluent/plugin/out_detect_exceptions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@
require 'fluent/plugin/exception_detector'
require 'fluent/output'

module Fluent
module Fluent::Plugin
# This output plugin consumes a log stream of JSON objects which contain
# single-line log messages. If a consecutive sequence of log messages form
# an exception stack trace, they forwarded as a single, combined JSON
# object. Otherwise, the input log data is forwarded as is.
class DetectExceptionsOutput < Output
class DetectExceptionsOutput < Fluent::Plugin::Output
Fluent::Plugin.register_output('detect_exceptions', self)

helpers :compat_parameters, :thread, :event_emitter

desc 'The prefix to be removed from the input tag when outputting a record.'
config_param :remove_tag_prefix, :string
desc 'The field which contains the raw message text in the input JSON data.'
Expand All @@ -39,9 +43,8 @@ class DetectExceptionsOutput < Output
desc 'Separate log streams by this field in the input JSON data.'
config_param :stream, :string, default: ''

Fluent::Plugin.register_output('detect_exceptions', self)

def configure(conf)
compat_parameters_convert(conf)
super

@check_flush_interval = [multiline_flush_interval * 0.1, 1].max if multiline_flush_interval
Expand All @@ -59,7 +62,7 @@ def start

@flush_buffer_mutex = Mutex.new
@stop_check = false
@thread = Thread.new(&method(:check_flush_loop))
thread_create(:detect_exceptions, &method(:check_flush_loop))
end

def before_shutdown
Expand All @@ -71,15 +74,13 @@ def shutdown
# Before shutdown is not available in older fluentd versions.
# Hence, we make sure that we flush the buffers here as well.
flush_buffers
@thread.join if @multiline_flush_interval
super
end

def emit(tag, entries, chain)
def process(tag, entries)
entries.each do |time_sec, record|
process_record(tag, time_sec, record)
end
chain.next
end

private
Expand Down
33 changes: 5 additions & 28 deletions test/helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,35 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

require 'rubygems'
require 'bundler'
begin
Bundler.setup(:default, :development)
rescue Bundler::BundlerError => e
# rubocop:disable Style/StderrPuts
$stderr.puts e.message
$stderr.puts 'Run `bundle install` to install missing gems'
# rubocop:enable Style/StderrPuts
exit e.status_code
end
require 'bundler/setup'
require 'test/unit'

$LOAD_PATH.unshift(File.join(File.dirname(__FILE__), '..', 'lib'))
$LOAD_PATH.unshift(File.dirname(__FILE__))
require 'fluent/test'
unless ENV.key?('VERBOSE')
nulllogger = Object.new
nulllogger.instance_eval do |_|
def respond_to_missing?(_method, _include_private = false)
true
end

def method_missing(_method, *_args)
# pass
end
end
# global $log variable is used by fluentd
$log = nulllogger # rubocop:disable Style/GlobalVars
end
$LOAD_PATH.unshift(File.join(__dir__, '..', 'lib'))
$LOAD_PATH.unshift(__dir__)

require 'fluent/test'
require 'fluent/test/driver/output'
require 'fluent/plugin/out_detect_exceptions'
75 changes: 38 additions & 37 deletions test/plugin/test_out_detect_exceptions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@

require 'flexmock/test_unit'
require_relative '../helper'
require 'fluent/test/helpers'
require 'fluent/plugin/out_detect_exceptions'
require 'json'

class DetectExceptionsOutputTest < Test::Unit::TestCase
include Fluent::Test::Helpers

def setup
Fluent::Test.setup
end
Expand Down Expand Up @@ -64,10 +67,8 @@ def setup
from examble.rb:21:in `<main>'
END_RUBY

def create_driver(conf = CONFIG, tag = DEFAULT_TAG)
d = Fluent::Test::OutputTestDriver.new(Fluent::DetectExceptionsOutput, tag)
d.configure(conf)
d
def create_driver(conf = CONFIG)
Fluent::Test::Driver::Output.new(Fluent::Plugin::DetectExceptionsOutput).configure(conf)
end

def log_entry(message, count, stream)
Expand All @@ -81,7 +82,7 @@ def feed_lines_without_line_breaks(driver, timestamp, *messages, stream: nil)
messages.each do |m|
m.each_line do |line|
line.delete!("\n")
driver.emit(log_entry(line, count, stream), timestamp + count)
driver.feed(timestamp + count, log_entry(line, count, stream))
count += 1
end
end
Expand All @@ -91,24 +92,24 @@ def feed_lines(driver, timestamp, *messages, stream: nil)
count = 0
messages.each do |m|
m.each_line do |line|
driver.emit(log_entry(line, count, stream), timestamp + count)
driver.feed(timestamp + count, log_entry(line, count, stream))
count += 1
end
end
end

def run_driver(driver, *messages)
def run_driver(driver, tag, *messages)
t = Time.now.to_i
driver.run do
driver.run(default_tag: tag) do
feed_lines(driver, t, *messages)
end
end

def make_logs(timestamp, *messages, stream: nil)
def make_logs(tag, t, *messages, stream: nil)
count = 0
logs = []
messages.each do |m|
logs << [timestamp + count, log_entry(m, count, stream)]
logs << [tag, t + count, log_entry(m, count, stream)]
count += m.lines.count
end
logs
Expand All @@ -124,10 +125,10 @@ def test_exception_detection
d = create_driver
t = Time.now.to_i
messages = [ARBITRARY_TEXT, JAVA_EXC, ARBITRARY_TEXT]
d.run do
d.run(default_tag: DEFAULT_TAG_STRIPPED) do
feed_lines(d, t, *messages)
end
assert_equal(make_logs(t, *messages), d.events)
assert_equal(make_logs('test.tag', t, *messages), d.events)
end

def test_ignore_nested_exceptions
Expand Down Expand Up @@ -184,7 +185,7 @@ def test_ignore_nested_exceptions

d.instance.router = router_mock

d.run do
d.run(default_tag: DEFAULT_TAG_STRIPPED) do
feed_lines(d, t, json_line_with_exception + json_line_without_exception)
end
end
Expand All @@ -196,11 +197,11 @@ def test_single_language_config
languages java)
d = create_driver(cfg)
t = Time.now.to_i
d.run do
d.run(default_tag: DEFAULT_TAG_STRIPPED) do
feed_lines(d, t, ARBITRARY_TEXT, JAVA_EXC, PYTHON_EXC)
end
expected = ARBITRARY_TEXT.lines + [JAVA_EXC] + PYTHON_EXC.lines
assert_equal(make_logs(t, *expected), d.events)
assert_equal(make_logs(DEFAULT_TAG_STRIPPED, t, *expected), d.events)
end

def test_multi_language_config
Expand All @@ -209,11 +210,11 @@ def test_multi_language_config
languages python, java)
d = create_driver(cfg)
t = Time.now.to_i
d.run do
d.run(default_tag: DEFAULT_TAG_STRIPPED) do
feed_lines(d, t, ARBITRARY_TEXT, JAVA_EXC, PYTHON_EXC)
end
expected = ARBITRARY_TEXT.lines + [JAVA_EXC] + [PYTHON_EXC]
assert_equal(make_logs(t, *expected), d.events)
assert_equal(make_logs(DEFAULT_TAG_STRIPPED, t, *expected), d.events)
end

def test_split_exception_after_timeout
Expand All @@ -223,31 +224,31 @@ def test_split_exception_after_timeout
d = create_driver(cfg)
t1 = 0
t2 = 0
d.run do
d.run(default_tag: DEFAULT_TAG) do
t1 = Time.now.to_i
feed_lines(d, t1, JAVA_EXC)
sleep 2
t2 = Time.now.to_i
feed_lines(d, t2, " at x\n at y\n")
end
assert_equal(make_logs(t1, JAVA_EXC) +
make_logs(t2, " at x\n", " at y\n"),
assert_equal(make_logs(DEFAULT_TAG_STRIPPED, t1, JAVA_EXC) +
make_logs(DEFAULT_TAG_STRIPPED, t2, " at x\n", " at y\n"),
d.events)
end

def test_do_not_split_exception_after_pause
d = create_driver
t1 = 0
t2 = 0
d.run do
d.run(default_tag: DEFAULT_TAG) do
t1 = Time.now.to_i
feed_lines(d, t1, JAVA_EXC)
sleep 1
t2 = Time.now.to_i
feed_lines(d, t2, " at x\n at y\n")
d.instance.before_shutdown
end
assert_equal(make_logs(t1, "#{JAVA_EXC} at x\n at y\n"), d.events)
assert_equal(make_logs('test.tag', t1, JAVA_EXC + " at x\n at y\n"), d.events)
end

def test_remove_tag_prefix_is_required
Expand All @@ -258,9 +259,9 @@ def test_remove_tag_prefix_is_required

def get_out_tags(remove_tag_prefix, original_tag)
cfg = "remove_tag_prefix #{remove_tag_prefix}"
d = create_driver(cfg, original_tag)
run_driver(d, ARBITRARY_TEXT, JAVA_EXC, ARBITRARY_TEXT)
d.emits.collect { |e| e[0] }.sort.uniq
d = create_driver(cfg)
run_driver(d, original_tag, ARBITRARY_TEXT, JAVA_EXC, ARBITRARY_TEXT)
d.events.collect { |e| e[0] }.sort.uniq
end

def test_remove_tag_prefix
Expand All @@ -278,11 +279,11 @@ def test_force_line_breaks_false
force_line_breaks true)
d = create_driver(cfg)
t = Time.now.to_i
d.run do
d.run(default_tag: DEFAULT_TAG) do
feed_lines(d, t, JAVA_EXC)
end
expected = JAVA_EXC
assert_equal(make_logs(t, *expected), d.events)
assert_equal(make_logs(DEFAULT_TAG_STRIPPED, t, *expected), d.events)
end

def test_force_line_breaks_true
Expand All @@ -291,7 +292,7 @@ def test_force_line_breaks_true
force_line_breaks true)
d = create_driver(cfg)
t = Time.now.to_i
d.run do
d.run(default_tag: DEFAULT_TAG) do
feed_lines_without_line_breaks(d, t, JAVA_EXC)
end
# Expected: the first two lines of the exception are buffered and combined.
Expand All @@ -301,7 +302,7 @@ def test_force_line_breaks_true
# are buffered and combined. So are the first two lines of the second
# exception. Then the rest is logged line-by-line.
expected = JAVA_EXC.chomp
assert_equal(make_logs(t, *expected), d.events)
assert_equal(make_logs(DEFAULT_TAG_STRIPPED, t, *expected), d.events)
end

def test_flush_after_max_lines
Expand All @@ -310,7 +311,7 @@ def test_flush_after_max_lines
max_lines 2)
d = create_driver(cfg)
t = Time.now.to_i
d.run do
d.run(default_tag: DEFAULT_TAG) do
feed_lines(d, t, PYTHON_EXC, JAVA_EXC)
end
# Expected: the first two lines of the exception are buffered and combined.
Expand All @@ -321,8 +322,8 @@ def test_flush_after_max_lines
# exception. Then the rest is logged line-by-line.
expected = [PYTHON_EXC.lines[0..1].join] + PYTHON_EXC.lines[2..] + \
[JAVA_EXC.lines[0..1].join] + [JAVA_EXC.lines[2..3].join] + \
JAVA_EXC.lines[4..]
assert_equal(make_logs(t, *expected), d.events)
JAVA_EXC.lines[4..-1]
assert_equal(make_logs(DEFAULT_TAG_STRIPPED, t, *expected), d.events)
end

def test_separate_streams
Expand All @@ -331,7 +332,7 @@ def test_separate_streams
stream stream)
d = create_driver(cfg)
t = Time.now.to_i
d.run do
d.run(default_tag: DEFAULT_TAG) do
feed_lines(d, t, JAVA_EXC.lines[0], stream: 'java')
feed_lines(d, t, PYTHON_EXC.lines[0..1].join, stream: 'python')
feed_lines(d, t, JAVA_EXC.lines[1..].join, stream: 'java')
Expand All @@ -343,10 +344,10 @@ def test_separate_streams
# because they belong to different streams.
# Note that the Java exception is only detected when 'something else'
# is processed.
expected = make_logs(t, JAVA_EXC, stream: 'java') +
make_logs(t, PYTHON_EXC, stream: 'python') +
make_logs(t, JAVA_EXC, stream: 'java') +
make_logs(t, 'something else', stream: 'java')
expected = make_logs(DEFAULT_TAG_STRIPPED, t, JAVA_EXC, stream: 'java') +
make_logs(DEFAULT_TAG_STRIPPED, t, PYTHON_EXC, stream: 'python') +
make_logs(DEFAULT_TAG_STRIPPED, t, JAVA_EXC, stream: 'java') +
make_logs(DEFAULT_TAG_STRIPPED, t, 'something else', stream: 'java')
assert_equal(expected, d.events)
end
end