Skip to content

Commit 2f8cb0f

Browse files
CryptophobiaCryptophobia
authored andcommitted
feat: reconcile upstream with v1 API PR
- fixing missing code to make v1 API commit work with upstream changes - fixing functions for new parameter force_line_breaks - fixing rubocop errors and exanded lineLength to 100 chars - all tests pass on ruby 2.7.4 - this combines GoogleCloudPlatform#45 (resolves GoogleCloudPlatform#45) and reconciles it for the latest code changes in upstream Signed-off-by: Cryptophobia <[email protected]>
1 parent fcaeb43 commit 2f8cb0f

File tree

6 files changed

+59
-75
lines changed

6 files changed

+59
-75
lines changed

.rubocop.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@ Metrics/AbcSize:
2222
Metrics/ClassLength:
2323
Max: 1300
2424

25+
# Offense count: 2
26+
# Configuration parameters: CountComments.
27+
Metrics/LineLength:
28+
Max: 100
29+
2530
# Offense count: 1
2631
# Configuration parameters: CountComments.
2732
Metrics/ModuleLength:

README.rdoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ The plugin supports the following parameters:
7070
forwarded. If not set, incomplete exceptions stacks
7171
are not flushed.
7272

73-
[force_line_breaks] Force line breaks between each lines when comibining exception stacks.
73+
[force_line_breaks] Force line breaks between each lines when combining exception stacks.
7474
This is useful if your exception is formatted
7575
as a single line. i.e., logs retrieved from the docker's
7676
logging driver don't have any line break.

fluent-plugin-detect-exceptions.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ Gem::Specification.new do |gem|
2020
gem.test_files = gem.files.grep(/^(test)/)
2121
gem.require_paths = ['lib']
2222

23-
gem.add_runtime_dependency 'fluentd', '>= 0.10'
23+
gem.add_runtime_dependency 'fluentd', '>= 1.0', '< 2'
2424

2525
gem.add_development_dependency 'flexmock', '~> 2.0'
2626
gem.add_development_dependency 'rake', '~> 10.3'

lib/fluent/plugin/out_detect_exceptions.rb

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,16 @@
1616
require 'fluent/plugin/exception_detector'
1717
require 'fluent/output'
1818

19-
module Fluent
19+
module Fluent::Plugin
2020
# This output plugin consumes a log stream of JSON objects which contain
2121
# single-line log messages. If a consecutive sequence of log messages form
2222
# an exception stack trace, they forwarded as a single, combined JSON
2323
# object. Otherwise, the input log data is forwarded as is.
24-
class DetectExceptionsOutput < Output
24+
class DetectExceptionsOutput < Fluent::Plugin::Output
25+
Fluent::Plugin.register_output('detect_exceptions', self)
26+
27+
helpers :compat_parameters, :thread, :event_emitter
28+
2529
desc 'The prefix to be removed from the input tag when outputting a record.'
2630
config_param :remove_tag_prefix, :string
2731
desc 'The field which contains the raw message text in the input JSON data.'
@@ -39,9 +43,8 @@ class DetectExceptionsOutput < Output
3943
desc 'Separate log streams by this field in the input JSON data.'
4044
config_param :stream, :string, default: ''
4145

42-
Fluent::Plugin.register_output('detect_exceptions', self)
43-
4446
def configure(conf)
47+
compat_parameters_convert(conf)
4548
super
4649

4750
@check_flush_interval = [multiline_flush_interval * 0.1, 1].max if multiline_flush_interval
@@ -59,7 +62,7 @@ def start
5962

6063
@flush_buffer_mutex = Mutex.new
6164
@stop_check = false
62-
@thread = Thread.new(&method(:check_flush_loop))
65+
thread_create(:detect_exceptions, &method(:check_flush_loop))
6366
end
6467

6568
def before_shutdown
@@ -71,15 +74,13 @@ def shutdown
7174
# Before shutdown is not available in older fluentd versions.
7275
# Hence, we make sure that we flush the buffers here as well.
7376
flush_buffers
74-
@thread.join if @multiline_flush_interval
7577
super
7678
end
7779

78-
def emit(tag, entries, chain)
80+
def process(tag, entries)
7981
entries.each do |time_sec, record|
8082
process_record(tag, time_sec, record)
8183
end
82-
chain.next
8384
end
8485

8586
private

test/helper.rb

Lines changed: 5 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -12,35 +12,12 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
require 'rubygems'
16-
require 'bundler'
17-
begin
18-
Bundler.setup(:default, :development)
19-
rescue Bundler::BundlerError => e
20-
# rubocop:disable Style/StderrPuts
21-
$stderr.puts e.message
22-
$stderr.puts 'Run `bundle install` to install missing gems'
23-
# rubocop:enable Style/StderrPuts
24-
exit e.status_code
25-
end
15+
require 'bundler/setup'
2616
require 'test/unit'
2717

28-
$LOAD_PATH.unshift(File.join(File.dirname(__FILE__), '..', 'lib'))
29-
$LOAD_PATH.unshift(File.dirname(__FILE__))
30-
require 'fluent/test'
31-
unless ENV.key?('VERBOSE')
32-
nulllogger = Object.new
33-
nulllogger.instance_eval do |_|
34-
def respond_to_missing?(_method, _include_private = false)
35-
true
36-
end
37-
38-
def method_missing(_method, *_args)
39-
# pass
40-
end
41-
end
42-
# global $log variable is used by fluentd
43-
$log = nulllogger # rubocop:disable Style/GlobalVars
44-
end
18+
$LOAD_PATH.unshift(File.join(__dir__, '..', 'lib'))
19+
$LOAD_PATH.unshift(__dir__)
4520

21+
require 'fluent/test'
22+
require 'fluent/test/driver/output'
4623
require 'fluent/plugin/out_detect_exceptions'

test/plugin/test_out_detect_exceptions.rb

Lines changed: 38 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,13 @@
1414

1515
require 'flexmock/test_unit'
1616
require_relative '../helper'
17+
require 'fluent/test/helpers'
1718
require 'fluent/plugin/out_detect_exceptions'
1819
require 'json'
1920

2021
class DetectExceptionsOutputTest < Test::Unit::TestCase
22+
include Fluent::Test::Helpers
23+
2124
def setup
2225
Fluent::Test.setup
2326
end
@@ -64,10 +67,8 @@ def setup
6467
from examble.rb:21:in `<main>'
6568
END_RUBY
6669

67-
def create_driver(conf = CONFIG, tag = DEFAULT_TAG)
68-
d = Fluent::Test::OutputTestDriver.new(Fluent::DetectExceptionsOutput, tag)
69-
d.configure(conf)
70-
d
70+
def create_driver(conf = CONFIG)
71+
Fluent::Test::Driver::Output.new(Fluent::Plugin::DetectExceptionsOutput).configure(conf)
7172
end
7273

7374
def log_entry(message, count, stream)
@@ -81,7 +82,7 @@ def feed_lines_without_line_breaks(driver, timestamp, *messages, stream: nil)
8182
messages.each do |m|
8283
m.each_line do |line|
8384
line.delete!("\n")
84-
driver.emit(log_entry(line, count, stream), timestamp + count)
85+
driver.feed(timestamp + count, log_entry(line, count, stream))
8586
count += 1
8687
end
8788
end
@@ -91,24 +92,24 @@ def feed_lines(driver, timestamp, *messages, stream: nil)
9192
count = 0
9293
messages.each do |m|
9394
m.each_line do |line|
94-
driver.emit(log_entry(line, count, stream), timestamp + count)
95+
driver.feed(timestamp + count, log_entry(line, count, stream))
9596
count += 1
9697
end
9798
end
9899
end
99100

100-
def run_driver(driver, *messages)
101+
def run_driver(driver, tag, *messages)
101102
t = Time.now.to_i
102-
driver.run do
103+
driver.run(default_tag: tag) do
103104
feed_lines(driver, t, *messages)
104105
end
105106
end
106107

107-
def make_logs(timestamp, *messages, stream: nil)
108+
def make_logs(tag, t, *messages, stream: nil)
108109
count = 0
109110
logs = []
110111
messages.each do |m|
111-
logs << [timestamp + count, log_entry(m, count, stream)]
112+
logs << [tag, t + count, log_entry(m, count, stream)]
112113
count += m.lines.count
113114
end
114115
logs
@@ -124,10 +125,10 @@ def test_exception_detection
124125
d = create_driver
125126
t = Time.now.to_i
126127
messages = [ARBITRARY_TEXT, JAVA_EXC, ARBITRARY_TEXT]
127-
d.run do
128+
d.run(default_tag: DEFAULT_TAG_STRIPPED) do
128129
feed_lines(d, t, *messages)
129130
end
130-
assert_equal(make_logs(t, *messages), d.events)
131+
assert_equal(make_logs('test.tag', t, *messages), d.events)
131132
end
132133

133134
def test_ignore_nested_exceptions
@@ -184,7 +185,7 @@ def test_ignore_nested_exceptions
184185

185186
d.instance.router = router_mock
186187

187-
d.run do
188+
d.run(default_tag: DEFAULT_TAG_STRIPPED) do
188189
feed_lines(d, t, json_line_with_exception + json_line_without_exception)
189190
end
190191
end
@@ -196,11 +197,11 @@ def test_single_language_config
196197
languages java)
197198
d = create_driver(cfg)
198199
t = Time.now.to_i
199-
d.run do
200+
d.run(default_tag: DEFAULT_TAG_STRIPPED) do
200201
feed_lines(d, t, ARBITRARY_TEXT, JAVA_EXC, PYTHON_EXC)
201202
end
202203
expected = ARBITRARY_TEXT.lines + [JAVA_EXC] + PYTHON_EXC.lines
203-
assert_equal(make_logs(t, *expected), d.events)
204+
assert_equal(make_logs(DEFAULT_TAG_STRIPPED, t, *expected), d.events)
204205
end
205206

206207
def test_multi_language_config
@@ -209,11 +210,11 @@ def test_multi_language_config
209210
languages python, java)
210211
d = create_driver(cfg)
211212
t = Time.now.to_i
212-
d.run do
213+
d.run(default_tag: DEFAULT_TAG_STRIPPED) do
213214
feed_lines(d, t, ARBITRARY_TEXT, JAVA_EXC, PYTHON_EXC)
214215
end
215216
expected = ARBITRARY_TEXT.lines + [JAVA_EXC] + [PYTHON_EXC]
216-
assert_equal(make_logs(t, *expected), d.events)
217+
assert_equal(make_logs(DEFAULT_TAG_STRIPPED, t, *expected), d.events)
217218
end
218219

219220
def test_split_exception_after_timeout
@@ -223,31 +224,31 @@ def test_split_exception_after_timeout
223224
d = create_driver(cfg)
224225
t1 = 0
225226
t2 = 0
226-
d.run do
227+
d.run(default_tag: DEFAULT_TAG) do
227228
t1 = Time.now.to_i
228229
feed_lines(d, t1, JAVA_EXC)
229230
sleep 2
230231
t2 = Time.now.to_i
231232
feed_lines(d, t2, " at x\n at y\n")
232233
end
233-
assert_equal(make_logs(t1, JAVA_EXC) +
234-
make_logs(t2, " at x\n", " at y\n"),
234+
assert_equal(make_logs(DEFAULT_TAG_STRIPPED, t1, JAVA_EXC) +
235+
make_logs(DEFAULT_TAG_STRIPPED, t2, " at x\n", " at y\n"),
235236
d.events)
236237
end
237238

238239
def test_do_not_split_exception_after_pause
239240
d = create_driver
240241
t1 = 0
241242
t2 = 0
242-
d.run do
243+
d.run(default_tag: DEFAULT_TAG) do
243244
t1 = Time.now.to_i
244245
feed_lines(d, t1, JAVA_EXC)
245246
sleep 1
246247
t2 = Time.now.to_i
247248
feed_lines(d, t2, " at x\n at y\n")
248249
d.instance.before_shutdown
249250
end
250-
assert_equal(make_logs(t1, "#{JAVA_EXC} at x\n at y\n"), d.events)
251+
assert_equal(make_logs('test.tag', t1, JAVA_EXC + " at x\n at y\n"), d.events)
251252
end
252253

253254
def test_remove_tag_prefix_is_required
@@ -258,9 +259,9 @@ def test_remove_tag_prefix_is_required
258259

259260
def get_out_tags(remove_tag_prefix, original_tag)
260261
cfg = "remove_tag_prefix #{remove_tag_prefix}"
261-
d = create_driver(cfg, original_tag)
262-
run_driver(d, ARBITRARY_TEXT, JAVA_EXC, ARBITRARY_TEXT)
263-
d.emits.collect { |e| e[0] }.sort.uniq
262+
d = create_driver(cfg)
263+
run_driver(d, original_tag, ARBITRARY_TEXT, JAVA_EXC, ARBITRARY_TEXT)
264+
d.events.collect { |e| e[0] }.sort.uniq
264265
end
265266

266267
def test_remove_tag_prefix
@@ -278,11 +279,11 @@ def test_force_line_breaks_false
278279
force_line_breaks true)
279280
d = create_driver(cfg)
280281
t = Time.now.to_i
281-
d.run do
282+
d.run(default_tag: DEFAULT_TAG) do
282283
feed_lines(d, t, JAVA_EXC)
283284
end
284285
expected = JAVA_EXC
285-
assert_equal(make_logs(t, *expected), d.events)
286+
assert_equal(make_logs(DEFAULT_TAG_STRIPPED, t, *expected), d.events)
286287
end
287288

288289
def test_force_line_breaks_true
@@ -291,7 +292,7 @@ def test_force_line_breaks_true
291292
force_line_breaks true)
292293
d = create_driver(cfg)
293294
t = Time.now.to_i
294-
d.run do
295+
d.run(default_tag: DEFAULT_TAG) do
295296
feed_lines_without_line_breaks(d, t, JAVA_EXC)
296297
end
297298
# Expected: the first two lines of the exception are buffered and combined.
@@ -301,7 +302,7 @@ def test_force_line_breaks_true
301302
# are buffered and combined. So are the first two lines of the second
302303
# exception. Then the rest is logged line-by-line.
303304
expected = JAVA_EXC.chomp
304-
assert_equal(make_logs(t, *expected), d.events)
305+
assert_equal(make_logs(DEFAULT_TAG_STRIPPED, t, *expected), d.events)
305306
end
306307

307308
def test_flush_after_max_lines
@@ -310,7 +311,7 @@ def test_flush_after_max_lines
310311
max_lines 2)
311312
d = create_driver(cfg)
312313
t = Time.now.to_i
313-
d.run do
314+
d.run(default_tag: DEFAULT_TAG) do
314315
feed_lines(d, t, PYTHON_EXC, JAVA_EXC)
315316
end
316317
# Expected: the first two lines of the exception are buffered and combined.
@@ -321,8 +322,8 @@ def test_flush_after_max_lines
321322
# exception. Then the rest is logged line-by-line.
322323
expected = [PYTHON_EXC.lines[0..1].join] + PYTHON_EXC.lines[2..] + \
323324
[JAVA_EXC.lines[0..1].join] + [JAVA_EXC.lines[2..3].join] + \
324-
JAVA_EXC.lines[4..]
325-
assert_equal(make_logs(t, *expected), d.events)
325+
JAVA_EXC.lines[4..-1]
326+
assert_equal(make_logs(DEFAULT_TAG_STRIPPED, t, *expected), d.events)
326327
end
327328

328329
def test_separate_streams
@@ -331,7 +332,7 @@ def test_separate_streams
331332
stream stream)
332333
d = create_driver(cfg)
333334
t = Time.now.to_i
334-
d.run do
335+
d.run(default_tag: DEFAULT_TAG) do
335336
feed_lines(d, t, JAVA_EXC.lines[0], stream: 'java')
336337
feed_lines(d, t, PYTHON_EXC.lines[0..1].join, stream: 'python')
337338
feed_lines(d, t, JAVA_EXC.lines[1..].join, stream: 'java')
@@ -343,10 +344,10 @@ def test_separate_streams
343344
# because they belong to different streams.
344345
# Note that the Java exception is only detected when 'something else'
345346
# is processed.
346-
expected = make_logs(t, JAVA_EXC, stream: 'java') +
347-
make_logs(t, PYTHON_EXC, stream: 'python') +
348-
make_logs(t, JAVA_EXC, stream: 'java') +
349-
make_logs(t, 'something else', stream: 'java')
347+
expected = make_logs(DEFAULT_TAG_STRIPPED, t, JAVA_EXC, stream: 'java') +
348+
make_logs(DEFAULT_TAG_STRIPPED, t, PYTHON_EXC, stream: 'python') +
349+
make_logs(DEFAULT_TAG_STRIPPED, t, JAVA_EXC, stream: 'java') +
350+
make_logs(DEFAULT_TAG_STRIPPED, t, 'something else', stream: 'java')
350351
assert_equal(expected, d.events)
351352
end
352353
end

0 commit comments

Comments
 (0)