diff --git a/.rubocop.yml b/.rubocop.yml index 03a7579..c17f1e9 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -22,6 +22,11 @@ Metrics/AbcSize: Metrics/ClassLength: Max: 1300 +# Offense count: 2 +# Configuration parameters: CountComments. +Metrics/LineLength: + Max: 100 + # Offense count: 1 # Configuration parameters: CountComments. Metrics/ModuleLength: diff --git a/README.rdoc b/README.rdoc index 14dec1f..64a8081 100644 --- a/README.rdoc +++ b/README.rdoc @@ -70,7 +70,7 @@ The plugin supports the following parameters: forwarded. If not set, incomplete exceptions stacks are not flushed. -[force_line_breaks] Force line breaks between each lines when comibining exception stacks. +[force_line_breaks] Force line breaks between each lines when combining exception stacks. This is useful if your exception is formatted as a single line. i.e., logs retrieved from the docker's logging driver don't have any line break. diff --git a/fluent-plugin-detect-exceptions.gemspec b/fluent-plugin-detect-exceptions.gemspec index 9003bc9..79088c1 100644 --- a/fluent-plugin-detect-exceptions.gemspec +++ b/fluent-plugin-detect-exceptions.gemspec @@ -20,7 +20,7 @@ Gem::Specification.new do |gem| gem.test_files = gem.files.grep(/^(test)/) gem.require_paths = ['lib'] - gem.add_runtime_dependency 'fluentd', '>= 0.10' + gem.add_runtime_dependency 'fluentd', '>= 1.0', '< 2' gem.add_development_dependency 'flexmock', '~> 2.0' gem.add_development_dependency 'rake', '~> 10.3' diff --git a/lib/fluent/plugin/out_detect_exceptions.rb b/lib/fluent/plugin/out_detect_exceptions.rb index fa9bafc..1955982 100644 --- a/lib/fluent/plugin/out_detect_exceptions.rb +++ b/lib/fluent/plugin/out_detect_exceptions.rb @@ -16,12 +16,16 @@ require 'fluent/plugin/exception_detector' require 'fluent/output' -module Fluent +module Fluent::Plugin # This output plugin consumes a log stream of JSON objects which contain # single-line log messages. If a consecutive sequence of log messages form # an exception stack trace, they forwarded as a single, combined JSON # object. Otherwise, the input log data is forwarded as is. - class DetectExceptionsOutput < Output + class DetectExceptionsOutput < Fluent::Plugin::Output + Fluent::Plugin.register_output('detect_exceptions', self) + + helpers :compat_parameters, :thread, :event_emitter + desc 'The prefix to be removed from the input tag when outputting a record.' config_param :remove_tag_prefix, :string desc 'The field which contains the raw message text in the input JSON data.' @@ -39,9 +43,8 @@ class DetectExceptionsOutput < Output desc 'Separate log streams by this field in the input JSON data.' config_param :stream, :string, default: '' - Fluent::Plugin.register_output('detect_exceptions', self) - def configure(conf) + compat_parameters_convert(conf) super @check_flush_interval = [multiline_flush_interval * 0.1, 1].max if multiline_flush_interval @@ -59,7 +62,7 @@ def start @flush_buffer_mutex = Mutex.new @stop_check = false - @thread = Thread.new(&method(:check_flush_loop)) + thread_create(:detect_exceptions, &method(:check_flush_loop)) end def before_shutdown @@ -71,15 +74,13 @@ def shutdown # Before shutdown is not available in older fluentd versions. # Hence, we make sure that we flush the buffers here as well. flush_buffers - @thread.join if @multiline_flush_interval super end - def emit(tag, entries, chain) + def process(tag, entries) entries.each do |time_sec, record| process_record(tag, time_sec, record) end - chain.next end private diff --git a/test/helper.rb b/test/helper.rb index b0a7a91..6e4cf88 100644 --- a/test/helper.rb +++ b/test/helper.rb @@ -12,35 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -require 'rubygems' -require 'bundler' -begin - Bundler.setup(:default, :development) -rescue Bundler::BundlerError => e - # rubocop:disable Style/StderrPuts - $stderr.puts e.message - $stderr.puts 'Run `bundle install` to install missing gems' - # rubocop:enable Style/StderrPuts - exit e.status_code -end +require 'bundler/setup' require 'test/unit' -$LOAD_PATH.unshift(File.join(File.dirname(__FILE__), '..', 'lib')) -$LOAD_PATH.unshift(File.dirname(__FILE__)) -require 'fluent/test' -unless ENV.key?('VERBOSE') - nulllogger = Object.new - nulllogger.instance_eval do |_| - def respond_to_missing?(_method, _include_private = false) - true - end - - def method_missing(_method, *_args) - # pass - end - end - # global $log variable is used by fluentd - $log = nulllogger # rubocop:disable Style/GlobalVars -end +$LOAD_PATH.unshift(File.join(__dir__, '..', 'lib')) +$LOAD_PATH.unshift(__dir__) +require 'fluent/test' +require 'fluent/test/driver/output' require 'fluent/plugin/out_detect_exceptions' diff --git a/test/plugin/test_out_detect_exceptions.rb b/test/plugin/test_out_detect_exceptions.rb index 17e5c16..355d63e 100644 --- a/test/plugin/test_out_detect_exceptions.rb +++ b/test/plugin/test_out_detect_exceptions.rb @@ -14,10 +14,13 @@ require 'flexmock/test_unit' require_relative '../helper' +require 'fluent/test/helpers' require 'fluent/plugin/out_detect_exceptions' require 'json' class DetectExceptionsOutputTest < Test::Unit::TestCase + include Fluent::Test::Helpers + def setup Fluent::Test.setup end @@ -64,10 +67,8 @@ def setup from examble.rb:21:in `
' END_RUBY - def create_driver(conf = CONFIG, tag = DEFAULT_TAG) - d = Fluent::Test::OutputTestDriver.new(Fluent::DetectExceptionsOutput, tag) - d.configure(conf) - d + def create_driver(conf = CONFIG) + Fluent::Test::Driver::Output.new(Fluent::Plugin::DetectExceptionsOutput).configure(conf) end def log_entry(message, count, stream) @@ -81,7 +82,7 @@ def feed_lines_without_line_breaks(driver, timestamp, *messages, stream: nil) messages.each do |m| m.each_line do |line| line.delete!("\n") - driver.emit(log_entry(line, count, stream), timestamp + count) + driver.feed(timestamp + count, log_entry(line, count, stream)) count += 1 end end @@ -91,24 +92,24 @@ def feed_lines(driver, timestamp, *messages, stream: nil) count = 0 messages.each do |m| m.each_line do |line| - driver.emit(log_entry(line, count, stream), timestamp + count) + driver.feed(timestamp + count, log_entry(line, count, stream)) count += 1 end end end - def run_driver(driver, *messages) + def run_driver(driver, tag, *messages) t = Time.now.to_i - driver.run do + driver.run(default_tag: tag) do feed_lines(driver, t, *messages) end end - def make_logs(timestamp, *messages, stream: nil) + def make_logs(tag, t, *messages, stream: nil) count = 0 logs = [] messages.each do |m| - logs << [timestamp + count, log_entry(m, count, stream)] + logs << [tag, t + count, log_entry(m, count, stream)] count += m.lines.count end logs @@ -124,10 +125,10 @@ def test_exception_detection d = create_driver t = Time.now.to_i messages = [ARBITRARY_TEXT, JAVA_EXC, ARBITRARY_TEXT] - d.run do + d.run(default_tag: DEFAULT_TAG_STRIPPED) do feed_lines(d, t, *messages) end - assert_equal(make_logs(t, *messages), d.events) + assert_equal(make_logs('test.tag', t, *messages), d.events) end def test_ignore_nested_exceptions @@ -184,7 +185,7 @@ def test_ignore_nested_exceptions d.instance.router = router_mock - d.run do + d.run(default_tag: DEFAULT_TAG_STRIPPED) do feed_lines(d, t, json_line_with_exception + json_line_without_exception) end end @@ -196,11 +197,11 @@ def test_single_language_config languages java) d = create_driver(cfg) t = Time.now.to_i - d.run do + d.run(default_tag: DEFAULT_TAG_STRIPPED) do feed_lines(d, t, ARBITRARY_TEXT, JAVA_EXC, PYTHON_EXC) end expected = ARBITRARY_TEXT.lines + [JAVA_EXC] + PYTHON_EXC.lines - assert_equal(make_logs(t, *expected), d.events) + assert_equal(make_logs(DEFAULT_TAG_STRIPPED, t, *expected), d.events) end def test_multi_language_config @@ -209,11 +210,11 @@ def test_multi_language_config languages python, java) d = create_driver(cfg) t = Time.now.to_i - d.run do + d.run(default_tag: DEFAULT_TAG_STRIPPED) do feed_lines(d, t, ARBITRARY_TEXT, JAVA_EXC, PYTHON_EXC) end expected = ARBITRARY_TEXT.lines + [JAVA_EXC] + [PYTHON_EXC] - assert_equal(make_logs(t, *expected), d.events) + assert_equal(make_logs(DEFAULT_TAG_STRIPPED, t, *expected), d.events) end def test_split_exception_after_timeout @@ -223,15 +224,15 @@ def test_split_exception_after_timeout d = create_driver(cfg) t1 = 0 t2 = 0 - d.run do + d.run(default_tag: DEFAULT_TAG) do t1 = Time.now.to_i feed_lines(d, t1, JAVA_EXC) sleep 2 t2 = Time.now.to_i feed_lines(d, t2, " at x\n at y\n") end - assert_equal(make_logs(t1, JAVA_EXC) + - make_logs(t2, " at x\n", " at y\n"), + assert_equal(make_logs(DEFAULT_TAG_STRIPPED, t1, JAVA_EXC) + + make_logs(DEFAULT_TAG_STRIPPED, t2, " at x\n", " at y\n"), d.events) end @@ -239,7 +240,7 @@ def test_do_not_split_exception_after_pause d = create_driver t1 = 0 t2 = 0 - d.run do + d.run(default_tag: DEFAULT_TAG) do t1 = Time.now.to_i feed_lines(d, t1, JAVA_EXC) sleep 1 @@ -247,7 +248,7 @@ def test_do_not_split_exception_after_pause feed_lines(d, t2, " at x\n at y\n") d.instance.before_shutdown end - assert_equal(make_logs(t1, "#{JAVA_EXC} at x\n at y\n"), d.events) + assert_equal(make_logs('test.tag', t1, JAVA_EXC + " at x\n at y\n"), d.events) end def test_remove_tag_prefix_is_required @@ -258,9 +259,9 @@ def test_remove_tag_prefix_is_required def get_out_tags(remove_tag_prefix, original_tag) cfg = "remove_tag_prefix #{remove_tag_prefix}" - d = create_driver(cfg, original_tag) - run_driver(d, ARBITRARY_TEXT, JAVA_EXC, ARBITRARY_TEXT) - d.emits.collect { |e| e[0] }.sort.uniq + d = create_driver(cfg) + run_driver(d, original_tag, ARBITRARY_TEXT, JAVA_EXC, ARBITRARY_TEXT) + d.events.collect { |e| e[0] }.sort.uniq end def test_remove_tag_prefix @@ -278,11 +279,11 @@ def test_force_line_breaks_false force_line_breaks true) d = create_driver(cfg) t = Time.now.to_i - d.run do + d.run(default_tag: DEFAULT_TAG) do feed_lines(d, t, JAVA_EXC) end expected = JAVA_EXC - assert_equal(make_logs(t, *expected), d.events) + assert_equal(make_logs(DEFAULT_TAG_STRIPPED, t, *expected), d.events) end def test_force_line_breaks_true @@ -291,7 +292,7 @@ def test_force_line_breaks_true force_line_breaks true) d = create_driver(cfg) t = Time.now.to_i - d.run do + d.run(default_tag: DEFAULT_TAG) do feed_lines_without_line_breaks(d, t, JAVA_EXC) end # Expected: the first two lines of the exception are buffered and combined. @@ -301,7 +302,7 @@ def test_force_line_breaks_true # are buffered and combined. So are the first two lines of the second # exception. Then the rest is logged line-by-line. expected = JAVA_EXC.chomp - assert_equal(make_logs(t, *expected), d.events) + assert_equal(make_logs(DEFAULT_TAG_STRIPPED, t, *expected), d.events) end def test_flush_after_max_lines @@ -310,7 +311,7 @@ def test_flush_after_max_lines max_lines 2) d = create_driver(cfg) t = Time.now.to_i - d.run do + d.run(default_tag: DEFAULT_TAG) do feed_lines(d, t, PYTHON_EXC, JAVA_EXC) end # Expected: the first two lines of the exception are buffered and combined. @@ -321,8 +322,8 @@ def test_flush_after_max_lines # exception. Then the rest is logged line-by-line. expected = [PYTHON_EXC.lines[0..1].join] + PYTHON_EXC.lines[2..] + \ [JAVA_EXC.lines[0..1].join] + [JAVA_EXC.lines[2..3].join] + \ - JAVA_EXC.lines[4..] - assert_equal(make_logs(t, *expected), d.events) + JAVA_EXC.lines[4..-1] + assert_equal(make_logs(DEFAULT_TAG_STRIPPED, t, *expected), d.events) end def test_separate_streams @@ -331,7 +332,7 @@ def test_separate_streams stream stream) d = create_driver(cfg) t = Time.now.to_i - d.run do + d.run(default_tag: DEFAULT_TAG) do feed_lines(d, t, JAVA_EXC.lines[0], stream: 'java') feed_lines(d, t, PYTHON_EXC.lines[0..1].join, stream: 'python') feed_lines(d, t, JAVA_EXC.lines[1..].join, stream: 'java') @@ -343,10 +344,10 @@ def test_separate_streams # because they belong to different streams. # Note that the Java exception is only detected when 'something else' # is processed. - expected = make_logs(t, JAVA_EXC, stream: 'java') + - make_logs(t, PYTHON_EXC, stream: 'python') + - make_logs(t, JAVA_EXC, stream: 'java') + - make_logs(t, 'something else', stream: 'java') + expected = make_logs(DEFAULT_TAG_STRIPPED, t, JAVA_EXC, stream: 'java') + + make_logs(DEFAULT_TAG_STRIPPED, t, PYTHON_EXC, stream: 'python') + + make_logs(DEFAULT_TAG_STRIPPED, t, JAVA_EXC, stream: 'java') + + make_logs(DEFAULT_TAG_STRIPPED, t, 'something else', stream: 'java') assert_equal(expected, d.events) end end