Skip to content

Commit 24dd435

Browse files
author
Cryptophobia
committed
feat: reconcile upstream with v1 API PR
- fixing missing code to make v1 API commit work with upstream changes - fixing functions for new parameter force_line_breaks - fixing rubocop errors and exanded lineLength to 100 chars - all tests pass on ruby 2.7.4 - this combines GoogleCloudPlatform#45 (resolves GoogleCloudPlatform#45) and reconciles it for the latest code changes in upstream Signed-off-by: Cryptophobia <[email protected]>
1 parent 17c846b commit 24dd435

File tree

6 files changed

+59
-75
lines changed

6 files changed

+59
-75
lines changed

.rubocop.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,11 @@ Metrics/AbcSize:
1414
Metrics/ClassLength:
1515
Max: 1300
1616

17+
# Offense count: 2
18+
# Configuration parameters: CountComments.
19+
Metrics/LineLength:
20+
Max: 100
21+
1722
# Offense count: 1
1823
# Configuration parameters: CountComments.
1924
Metrics/ModuleLength:

README.rdoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ The plugin supports the following parameters:
7070
forwarded. If not set, incomplete exceptions stacks
7171
are not flushed.
7272

73-
[force_line_breaks] Force line breaks between each lines when comibining exception stacks.
73+
[force_line_breaks] Force line breaks between each lines when combining exception stacks.
7474
This is useful if your exception is formatted
7575
as a single line. i.e., logs retrieved from the docker's
7676
logging driver don't have any line break.

fluent-plugin-detect-exceptions.gemspec

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ eos
2020
gem.test_files = gem.files.grep(/^(test)/)
2121
gem.require_paths = ['lib']
2222

23-
gem.add_runtime_dependency 'fluentd', '>= 0.10'
23+
gem.add_runtime_dependency 'fluentd', '>= 1.0', '< 2'
2424

25-
gem.add_development_dependency 'rake', '~> 10.3'
25+
gem.add_development_dependency 'rake'
2626
gem.add_development_dependency 'rubocop', '= 0.42.0'
2727
gem.add_development_dependency 'test-unit', '~> 3.0'
2828
gem.add_development_dependency 'flexmock', '~> 2.0'

lib/fluent/plugin/out_detect_exceptions.rb

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,16 @@
1616
require 'fluent/plugin/exception_detector'
1717
require 'fluent/output'
1818

19-
module Fluent
19+
module Fluent::Plugin
2020
# This output plugin consumes a log stream of JSON objects which contain
2121
# single-line log messages. If a consecutive sequence of log messages form
2222
# an exception stack trace, they forwarded as a single, combined JSON
2323
# object. Otherwise, the input log data is forwarded as is.
24-
class DetectExceptionsOutput < Output
24+
class DetectExceptionsOutput < Fluent::Plugin::Output
25+
Fluent::Plugin.register_output('detect_exceptions', self)
26+
27+
helpers :compat_parameters, :thread, :event_emitter
28+
2529
desc 'The prefix to be removed from the input tag when outputting a record.'
2630
config_param :remove_tag_prefix, :string
2731
desc 'The field which contains the raw message text in the input JSON data.'
@@ -39,9 +43,8 @@ class DetectExceptionsOutput < Output
3943
desc 'Separate log streams by this field in the input JSON data.'
4044
config_param :stream, :string, default: ''
4145

42-
Fluent::Plugin.register_output('detect_exceptions', self)
43-
4446
def configure(conf)
47+
compat_parameters_convert(conf)
4548
super
4649

4750
if multiline_flush_interval
@@ -60,7 +63,7 @@ def start
6063
if multiline_flush_interval
6164
@flush_buffer_mutex = Mutex.new
6265
@stop_check = false
63-
@thread = Thread.new(&method(:check_flush_loop))
66+
thread_create(:detect_exceptions, &method(:check_flush_loop))
6467
end
6568
end
6669

@@ -73,15 +76,13 @@ def shutdown
7376
# Before shutdown is not available in older fluentd versions.
7477
# Hence, we make sure that we flush the buffers here as well.
7578
flush_buffers
76-
@thread.join if @multiline_flush_interval
7779
super
7880
end
7981

80-
def emit(tag, es, chain)
82+
def process(tag, es)
8183
es.each do |time_sec, record|
8284
process_record(tag, time_sec, record)
8385
end
84-
chain.next
8586
end
8687

8788
private

test/helper.rb

Lines changed: 5 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -12,35 +12,12 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
require 'rubygems'
16-
require 'bundler'
17-
18-
begin
19-
Bundler.setup(:default, :development)
20-
rescue Bundler::BundlerError => e
21-
$stderr.puts e.message
22-
$stderr.puts 'Run `bundle install` to install missing gems'
23-
exit e.status_code
24-
end
25-
15+
require 'bundler/setup'
2616
require 'test/unit'
2717

28-
$LOAD_PATH.unshift(File.join(File.dirname(__FILE__), '..', 'lib'))
29-
$LOAD_PATH.unshift(File.dirname(__FILE__))
30-
require 'fluent/test'
31-
32-
unless ENV.key?('VERBOSE')
33-
nulllogger = Object.new
34-
nulllogger.instance_eval do |_|
35-
def respond_to_missing?
36-
true
37-
end
38-
39-
def method_missing(_method, *_args) # rubocop:disable Style/MethodMissing
40-
end
41-
end
42-
# global $log variable is used by fluentd
43-
$log = nulllogger # rubocop:disable Style/GlobalVars
44-
end
18+
$LOAD_PATH.unshift(File.join(__dir__, '..', 'lib'))
19+
$LOAD_PATH.unshift(__dir__)
4520

21+
require 'fluent/test'
22+
require 'fluent/test/driver/output'
4623
require 'fluent/plugin/out_detect_exceptions'

test/plugin/test_out_detect_exceptions.rb

Lines changed: 37 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,13 @@
1414

1515
require 'flexmock/test_unit'
1616
require_relative '../helper'
17+
require 'fluent/test/helpers'
1718
require 'fluent/plugin/out_detect_exceptions'
1819
require 'json'
1920

2021
class DetectExceptionsOutputTest < Test::Unit::TestCase
22+
include Fluent::Test::Helpers
23+
2124
def setup
2225
Fluent::Test.setup
2326
end
@@ -64,10 +67,8 @@ def setup
6467
from examble.rb:21:in `<main>'
6568
END
6669

67-
def create_driver(conf = CONFIG, tag = DEFAULT_TAG)
68-
d = Fluent::Test::OutputTestDriver.new(Fluent::DetectExceptionsOutput, tag)
69-
d.configure(conf)
70-
d
70+
def create_driver(conf = CONFIG)
71+
Fluent::Test::Driver::Output.new(Fluent::Plugin::DetectExceptionsOutput).configure(conf)
7172
end
7273

7374
def log_entry(message, count, stream)
@@ -81,7 +82,7 @@ def feed_lines_without_line_breaks(driver, t, *messages, stream: nil)
8182
messages.each do |m|
8283
m.each_line do |line|
8384
line.delete!("\n")
84-
driver.emit(log_entry(line, count, stream), t + count)
85+
driver.feed(t + count, log_entry(line, count, stream))
8586
count += 1
8687
end
8788
end
@@ -91,24 +92,24 @@ def feed_lines(driver, t, *messages, stream: nil)
9192
count = 0
9293
messages.each do |m|
9394
m.each_line do |line|
94-
driver.emit(log_entry(line, count, stream), t + count)
95+
driver.feed(t + count, log_entry(line, count, stream))
9596
count += 1
9697
end
9798
end
9899
end
99100

100-
def run_driver(driver, *messages)
101+
def run_driver(driver, tag, *messages)
101102
t = Time.now.to_i
102-
driver.run do
103+
driver.run(default_tag: tag) do
103104
feed_lines(driver, t, *messages)
104105
end
105106
end
106107

107-
def make_logs(t, *messages, stream: nil)
108+
def make_logs(tag, t, *messages, stream: nil)
108109
count = 0
109110
logs = []
110111
messages.each do |m|
111-
logs << [t + count, log_entry(m, count, stream)]
112+
logs << [tag, t + count, log_entry(m, count, stream)]
112113
count += m.lines.count
113114
end
114115
logs
@@ -124,10 +125,10 @@ def test_exception_detection
124125
d = create_driver
125126
t = Time.now.to_i
126127
messages = [ARBITRARY_TEXT, JAVA_EXC, ARBITRARY_TEXT]
127-
d.run do
128+
d.run(default_tag: DEFAULT_TAG_STRIPPED) do
128129
feed_lines(d, t, *messages)
129130
end
130-
assert_equal(make_logs(t, *messages), d.events)
131+
assert_equal(make_logs('test.tag', t, *messages), d.events)
131132
end
132133

133134
def test_ignore_nested_exceptions
@@ -182,7 +183,7 @@ def test_ignore_nested_exceptions
182183

183184
d.instance.router = router_mock
184185

185-
d.run do
186+
d.run(default_tag: DEFAULT_TAG_STRIPPED) do
186187
feed_lines(d, t, json_line_with_exception + json_line_without_exception)
187188
end
188189
end
@@ -194,11 +195,11 @@ def test_single_language_config
194195
languages java)
195196
d = create_driver(cfg)
196197
t = Time.now.to_i
197-
d.run do
198+
d.run(default_tag: DEFAULT_TAG_STRIPPED) do
198199
feed_lines(d, t, ARBITRARY_TEXT, JAVA_EXC, PYTHON_EXC)
199200
end
200201
expected = ARBITRARY_TEXT.lines + [JAVA_EXC] + PYTHON_EXC.lines
201-
assert_equal(make_logs(t, *expected), d.events)
202+
assert_equal(make_logs(DEFAULT_TAG_STRIPPED, t, *expected), d.events)
202203
end
203204

204205
def test_multi_language_config
@@ -207,11 +208,11 @@ def test_multi_language_config
207208
languages python, java)
208209
d = create_driver(cfg)
209210
t = Time.now.to_i
210-
d.run do
211+
d.run(default_tag: DEFAULT_TAG_STRIPPED) do
211212
feed_lines(d, t, ARBITRARY_TEXT, JAVA_EXC, PYTHON_EXC)
212213
end
213214
expected = ARBITRARY_TEXT.lines + [JAVA_EXC] + [PYTHON_EXC]
214-
assert_equal(make_logs(t, *expected), d.events)
215+
assert_equal(make_logs(DEFAULT_TAG_STRIPPED, t, *expected), d.events)
215216
end
216217

217218
def test_split_exception_after_timeout
@@ -221,31 +222,31 @@ def test_split_exception_after_timeout
221222
d = create_driver(cfg)
222223
t1 = 0
223224
t2 = 0
224-
d.run do
225+
d.run(default_tag: DEFAULT_TAG) do
225226
t1 = Time.now.to_i
226227
feed_lines(d, t1, JAVA_EXC)
227228
sleep 2
228229
t2 = Time.now.to_i
229230
feed_lines(d, t2, " at x\n at y\n")
230231
end
231-
assert_equal(make_logs(t1, JAVA_EXC) +
232-
make_logs(t2, " at x\n", " at y\n"),
232+
assert_equal(make_logs(DEFAULT_TAG_STRIPPED, t1, JAVA_EXC) +
233+
make_logs(DEFAULT_TAG_STRIPPED, t2, " at x\n", " at y\n"),
233234
d.events)
234235
end
235236

236237
def test_do_not_split_exception_after_pause
237238
d = create_driver
238239
t1 = 0
239240
t2 = 0
240-
d.run do
241+
d.run(default_tag: DEFAULT_TAG) do
241242
t1 = Time.now.to_i
242243
feed_lines(d, t1, JAVA_EXC)
243244
sleep 1
244245
t2 = Time.now.to_i
245246
feed_lines(d, t2, " at x\n at y\n")
246247
d.instance.before_shutdown
247248
end
248-
assert_equal(make_logs(t1, JAVA_EXC + " at x\n at y\n"), d.events)
249+
assert_equal(make_logs('test.tag', t1, JAVA_EXC + " at x\n at y\n"), d.events)
249250
end
250251

251252
def test_remove_tag_prefix_is_required
@@ -256,9 +257,9 @@ def test_remove_tag_prefix_is_required
256257

257258
def get_out_tags(remove_tag_prefix, original_tag)
258259
cfg = "remove_tag_prefix #{remove_tag_prefix}"
259-
d = create_driver(cfg, original_tag)
260-
run_driver(d, ARBITRARY_TEXT, JAVA_EXC, ARBITRARY_TEXT)
261-
d.emits.collect { |e| e[0] }.sort.uniq
260+
d = create_driver(cfg)
261+
run_driver(d, original_tag, ARBITRARY_TEXT, JAVA_EXC, ARBITRARY_TEXT)
262+
d.events.collect { |e| e[0] }.sort.uniq
262263
end
263264

264265
def test_remove_tag_prefix
@@ -276,11 +277,11 @@ def test_force_line_breaks_false
276277
force_line_breaks true)
277278
d = create_driver(cfg)
278279
t = Time.now.to_i
279-
d.run do
280+
d.run(default_tag: DEFAULT_TAG) do
280281
feed_lines(d, t, JAVA_EXC)
281282
end
282283
expected = JAVA_EXC
283-
assert_equal(make_logs(t, *expected), d.events)
284+
assert_equal(make_logs(DEFAULT_TAG_STRIPPED, t, *expected), d.events)
284285
end
285286

286287
def test_force_line_breaks_true
@@ -289,7 +290,7 @@ def test_force_line_breaks_true
289290
force_line_breaks true)
290291
d = create_driver(cfg)
291292
t = Time.now.to_i
292-
d.run do
293+
d.run(default_tag: DEFAULT_TAG) do
293294
feed_lines_without_line_breaks(d, t, JAVA_EXC)
294295
end
295296
# Expected: the first two lines of the exception are buffered and combined.
@@ -299,7 +300,7 @@ def test_force_line_breaks_true
299300
# are buffered and combined. So are the first two lines of the second
300301
# exception. Then the rest is logged line-by-line.
301302
expected = JAVA_EXC.chomp
302-
assert_equal(make_logs(t, *expected), d.events)
303+
assert_equal(make_logs(DEFAULT_TAG_STRIPPED, t, *expected), d.events)
303304
end
304305

305306
def test_flush_after_max_lines
@@ -308,7 +309,7 @@ def test_flush_after_max_lines
308309
max_lines 2)
309310
d = create_driver(cfg)
310311
t = Time.now.to_i
311-
d.run do
312+
d.run(default_tag: DEFAULT_TAG) do
312313
feed_lines(d, t, PYTHON_EXC, JAVA_EXC)
313314
end
314315
# Expected: the first two lines of the exception are buffered and combined.
@@ -320,7 +321,7 @@ def test_flush_after_max_lines
320321
expected = [PYTHON_EXC.lines[0..1].join] + PYTHON_EXC.lines[2..-1] + \
321322
[JAVA_EXC.lines[0..1].join] + [JAVA_EXC.lines[2..3].join] + \
322323
JAVA_EXC.lines[4..-1]
323-
assert_equal(make_logs(t, *expected), d.events)
324+
assert_equal(make_logs(DEFAULT_TAG_STRIPPED, t, *expected), d.events)
324325
end
325326

326327
def test_separate_streams
@@ -329,7 +330,7 @@ def test_separate_streams
329330
stream stream)
330331
d = create_driver(cfg)
331332
t = Time.now.to_i
332-
d.run do
333+
d.run(default_tag: DEFAULT_TAG) do
333334
feed_lines(d, t, JAVA_EXC.lines[0], stream: 'java')
334335
feed_lines(d, t, PYTHON_EXC.lines[0..1].join, stream: 'python')
335336
feed_lines(d, t, JAVA_EXC.lines[1..-1].join, stream: 'java')
@@ -341,10 +342,10 @@ def test_separate_streams
341342
# because they belong to different streams.
342343
# Note that the Java exception is only detected when 'something else'
343344
# is processed.
344-
expected = make_logs(t, JAVA_EXC, stream: 'java') +
345-
make_logs(t, PYTHON_EXC, stream: 'python') +
346-
make_logs(t, JAVA_EXC, stream: 'java') +
347-
make_logs(t, 'something else', stream: 'java')
345+
expected = make_logs(DEFAULT_TAG_STRIPPED, t, JAVA_EXC, stream: 'java') +
346+
make_logs(DEFAULT_TAG_STRIPPED, t, PYTHON_EXC, stream: 'python') +
347+
make_logs(DEFAULT_TAG_STRIPPED, t, JAVA_EXC, stream: 'java') +
348+
make_logs(DEFAULT_TAG_STRIPPED, t, 'something else', stream: 'java')
348349
assert_equal(expected, d.events)
349350
end
350351
end

0 commit comments

Comments
 (0)