Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions fluent-plugin-detect-exceptions.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ eos
gem.test_files = gem.files.grep(/^(test)/)
gem.require_paths = ['lib']

gem.add_runtime_dependency 'fluentd', '>= 0.10'
gem.add_runtime_dependency 'fluentd', '>= 1.0', "< 2"

gem.add_development_dependency 'rake', '~> 10.3'
gem.add_development_dependency 'rake'
gem.add_development_dependency 'rubocop', '= 0.42.0'
gem.add_development_dependency 'test-unit', '~> 3.0'
gem.add_development_dependency 'flexmock', '~> 2.0'
Expand Down
17 changes: 9 additions & 8 deletions lib/fluent/plugin/out_detect_exceptions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@
require 'fluent/plugin/exception_detector'
require 'fluent/output'

module Fluent
module Fluent::Plugin
# This output plugin consumes a log stream of JSON objects which contain
# single-line log messages. If a consecutive sequence of log messages form
# an exception stack trace, they forwarded as a single, combined JSON
# object. Otherwise, the input log data is forwarded as is.
class DetectExceptionsOutput < Output
class DetectExceptionsOutput < Fluent::Plugin::Output
Fluent::Plugin.register_output('detect_exceptions', self)

helpers :compat_parameters, :thread, :event_emitter

desc 'The field which contains the raw message text in the input JSON data.'
config_param :message, :string, default: ''
desc 'The prefix to be removed from the input tag when outputting a record.'
Expand All @@ -37,9 +41,8 @@ class DetectExceptionsOutput < Output
desc 'Separate log streams by this field in the input JSON data.'
config_param :stream, :string, default: ''

Fluent::Plugin.register_output('detect_exceptions', self)

def configure(conf)
compat_parameters_convert(conf)
super

if multiline_flush_interval
Expand All @@ -58,7 +61,7 @@ def start
if multiline_flush_interval
@flush_buffer_mutex = Mutex.new
@stop_check = false
@thread = Thread.new(&method(:check_flush_loop))
thread_create(:detect_exceptions, &method(:check_flush_loop))
end
end

Expand All @@ -71,15 +74,13 @@ def shutdown
# Before shutdown is not available in older fluentd versions.
# Hence, we make sure that we flush the buffers here as well.
flush_buffers
@thread.join if @multiline_flush_interval
super
end

def emit(tag, es, chain)
def process(tag, es)
es.each do |time_sec, record|
process_record(tag, time_sec, record)
end
chain.next
end

private
Expand Down
33 changes: 5 additions & 28 deletions test/helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,35 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

require 'rubygems'
require 'bundler'

begin
Bundler.setup(:default, :development)
rescue Bundler::BundlerError => e
$stderr.puts e.message
$stderr.puts 'Run `bundle install` to install missing gems'
exit e.status_code
end

require 'bundler/setup'
require 'test/unit'

$LOAD_PATH.unshift(File.join(File.dirname(__FILE__), '..', 'lib'))
$LOAD_PATH.unshift(File.dirname(__FILE__))
require 'fluent/test'

unless ENV.key?('VERBOSE')
nulllogger = Object.new
nulllogger.instance_eval do |_|
def respond_to_missing?
true
end

def method_missing(_method, *_args) # rubocop:disable Style/MethodMissing
end
end
# global $log variable is used by fluentd
$log = nulllogger # rubocop:disable Style/GlobalVars
end
$LOAD_PATH.unshift(File.join(__dir__, '..', 'lib'))
$LOAD_PATH.unshift(__dir__)

require 'fluent/test'
require 'fluent/test/driver/output'
require 'fluent/plugin/out_detect_exceptions'
63 changes: 32 additions & 31 deletions test/plugin/test_out_detect_exceptions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@

require 'flexmock/test_unit'
require_relative '../helper'
require 'fluent/test/helpers'
require 'fluent/plugin/out_detect_exceptions'
require 'json'

class DetectExceptionsOutputTest < Test::Unit::TestCase
include Fluent::Test::Helpers

def setup
Fluent::Test.setup
end
Expand Down Expand Up @@ -62,10 +65,8 @@ def setup
from examble.rb:21:in `<main>'
END

def create_driver(conf = CONFIG, tag = DEFAULT_TAG)
d = Fluent::Test::OutputTestDriver.new(Fluent::DetectExceptionsOutput, tag)
d.configure(conf)
d
def create_driver(conf = CONFIG)
Fluent::Test::Driver::Output.new(Fluent::Plugin::DetectExceptionsOutput).configure(conf)
end

def log_entry(message, count, stream)
Expand All @@ -78,24 +79,24 @@ def feed_lines(driver, t, *messages, stream: nil)
count = 0
messages.each do |m|
m.each_line do |line|
driver.emit(log_entry(line, count, stream), t + count)
driver.feed(t + count, log_entry(line, count, stream))
count += 1
end
end
end

def run_driver(driver, *messages)
def run_driver(driver, tag, *messages)
t = Time.now.to_i
driver.run do
driver.run(default_tag: tag) do
feed_lines(driver, t, *messages)
end
end

def make_logs(t, *messages, stream: nil)
def make_logs(tag, t, *messages, stream: nil)
count = 0
logs = []
messages.each do |m|
logs << [t + count, log_entry(m, count, stream)]
logs << [tag, t + count, log_entry(m, count, stream)]
count += m.lines.count
end
logs
Expand All @@ -111,10 +112,10 @@ def test_exception_detection
d = create_driver
t = Time.now.to_i
messages = [ARBITRARY_TEXT, JAVA_EXC, ARBITRARY_TEXT]
d.run do
d.run(default_tag: DEFAULT_TAG) do
feed_lines(d, t, *messages)
end
assert_equal(make_logs(t, *messages), d.events)
assert_equal(make_logs("test.tag", t, *messages), d.events)
end

def test_ignore_nested_exceptions
Expand Down Expand Up @@ -167,7 +168,7 @@ def test_ignore_nested_exceptions

d.instance.router = router_mock

d.run do
d.run(default_tag: DEFAULT_TAG) do
feed_lines(d, t, json_line_with_exception + json_line_without_exception)
end
end
Expand All @@ -177,61 +178,61 @@ def test_single_language_config
cfg = 'languages java'
d = create_driver(cfg)
t = Time.now.to_i
d.run do
d.run(default_tag: DEFAULT_TAG) do
feed_lines(d, t, ARBITRARY_TEXT, JAVA_EXC, PYTHON_EXC)
end
expected = ARBITRARY_TEXT.lines + [JAVA_EXC] + PYTHON_EXC.lines
assert_equal(make_logs(t, *expected), d.events)
assert_equal(make_logs(DEFAULT_TAG, t, *expected), d.events)
end

def test_multi_language_config
cfg = 'languages python, java'
d = create_driver(cfg)
t = Time.now.to_i
d.run do
d.run(default_tag: DEFAULT_TAG) do
feed_lines(d, t, ARBITRARY_TEXT, JAVA_EXC, PYTHON_EXC)
end
expected = ARBITRARY_TEXT.lines + [JAVA_EXC] + [PYTHON_EXC]
assert_equal(make_logs(t, *expected), d.events)
assert_equal(make_logs(DEFAULT_TAG, t, *expected), d.events)
end

def test_split_exception_after_timeout
cfg = 'multiline_flush_interval 1'
d = create_driver(cfg)
t1 = 0
t2 = 0
d.run do
d.run(default_tag: DEFAULT_TAG) do
t1 = Time.now.to_i
feed_lines(d, t1, JAVA_EXC)
sleep 2
t2 = Time.now.to_i
feed_lines(d, t2, " at x\n at y\n")
end
assert_equal(make_logs(t1, JAVA_EXC) +
make_logs(t2, " at x\n", " at y\n"),
assert_equal(make_logs(DEFAULT_TAG, t1, JAVA_EXC) +
make_logs(DEFAULT_TAG, t2, " at x\n", " at y\n"),
d.events)
end

def test_do_not_split_exception_after_pause
d = create_driver
t1 = 0
t2 = 0
d.run do
d.run(default_tag: DEFAULT_TAG) do
t1 = Time.now.to_i
feed_lines(d, t1, JAVA_EXC)
sleep 1
t2 = Time.now.to_i
feed_lines(d, t2, " at x\n at y\n")
d.instance.before_shutdown
end
assert_equal(make_logs(t1, JAVA_EXC + " at x\n at y\n"), d.events)
assert_equal(make_logs("test.tag", t1, JAVA_EXC + " at x\n at y\n"), d.events)
end

def get_out_tags(remove_tag_prefix, original_tag)
cfg = "remove_tag_prefix #{remove_tag_prefix}"
d = create_driver(cfg, original_tag)
run_driver(d, ARBITRARY_TEXT, JAVA_EXC, ARBITRARY_TEXT)
d.emits.collect { |e| e[0] }.sort.uniq
d = create_driver(cfg)
run_driver(d, original_tag, ARBITRARY_TEXT, JAVA_EXC, ARBITRARY_TEXT)
d.events.collect { |e| e[0] }.sort.uniq
end

def test_remove_tag_prefix
Expand All @@ -247,7 +248,7 @@ def test_flush_after_max_lines
cfg = 'max_lines 2'
d = create_driver(cfg)
t = Time.now.to_i
d.run do
d.run(default_tag: DEFAULT_TAG) do
feed_lines(d, t, PYTHON_EXC, JAVA_EXC)
end
# Expected: the first two lines of the exception are buffered and combined.
Expand All @@ -259,14 +260,14 @@ def test_flush_after_max_lines
expected = [PYTHON_EXC.lines[0..1].join] + PYTHON_EXC.lines[2..-1] + \
[JAVA_EXC.lines[0..1].join] + [JAVA_EXC.lines[2..3].join] + \
JAVA_EXC.lines[4..-1]
assert_equal(make_logs(t, *expected), d.events)
assert_equal(make_logs(DEFAULT_TAG, t, *expected), d.events)
end

def test_separate_streams
cfg = 'stream stream'
d = create_driver(cfg)
t = Time.now.to_i
d.run do
d.run(default_tag: DEFAULT_TAG) do
feed_lines(d, t, JAVA_EXC.lines[0], stream: 'java')
feed_lines(d, t, PYTHON_EXC.lines[0..1].join, stream: 'python')
feed_lines(d, t, JAVA_EXC.lines[1..-1].join, stream: 'java')
Expand All @@ -278,10 +279,10 @@ def test_separate_streams
# because they belong to different streams.
# Note that the Java exception is only detected when 'something else'
# is processed.
expected = make_logs(t, JAVA_EXC, stream: 'java') +
make_logs(t, PYTHON_EXC, stream: 'python') +
make_logs(t, JAVA_EXC, stream: 'java') +
make_logs(t, 'something else', stream: 'java')
expected = make_logs(DEFAULT_TAG, t, JAVA_EXC, stream: 'java') +
make_logs(DEFAULT_TAG, t, PYTHON_EXC, stream: 'python') +
make_logs(DEFAULT_TAG, t, JAVA_EXC, stream: 'java') +
make_logs(DEFAULT_TAG, t, 'something else', stream: 'java')
assert_equal(expected, d.events)
end
end