Skip to content

Commit 7b2bd8a

Browse files
author
Nitin Goel
committed
Support for dynamic field values in prefix
1 parent 114134a commit 7b2bd8a

File tree

1 file changed

+118
-54
lines changed
  • lib/logstash/outputs

1 file changed

+118
-54
lines changed

lib/logstash/outputs/s3.rb

Lines changed: 118 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
require "thread"
99
require "tmpdir"
1010
require "fileutils"
11+
require 'pathname'
1112

1213

1314
# INFORMATION:
@@ -59,6 +60,7 @@
5960
# size_file => 2048 (optional)
6061
# time_file => 5 (optional)
6162
# canned_acl => "private" (optional. Options are "private", "public_read", "public_read_write", "authenticated_read". Defaults to "private" )
63+
# no_event_wait => 5 (optional. Defines the number of time_file s3 upload events that may go with no eventns for the prefix, before cleaning up the watch on that)
6264
# }
6365
#
6466
class LogStash::Outputs::S3 < LogStash::Outputs::Base
@@ -109,15 +111,8 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base
109111
# Specify how many workers to use to upload the files to S3
110112
config :upload_workers_count, :validate => :number, :default => 1
111113

112-
# Define tags to be appended to the file on the S3 bucket.
113-
#
114-
# Example:
115-
# tags => ["elasticsearch", "logstash", "kibana"]
116-
#
117-
# Will generate this file:
118-
# "ls.s3.logstash.local.2015-01-01T00.00.tag_elasticsearch.logstash.kibana.part0.txt"
119-
#
120-
config :tags, :validate => :array, :default => []
114+
# Specify after how many interval of time_file, a prefix directory should be cleaned up locally if no events happing for it
115+
config :no_event_wait, :validate => :number, :default => 5
121116

122117
# Exposed attributes for testing purpose.
123118
attr_accessor :tempfile
@@ -148,8 +143,13 @@ def aws_service_endpoint(region)
148143
def write_on_bucket(file)
149144
# find and use the bucket
150145
bucket = @s3.buckets[@bucket]
146+
147+
first = Pathname.new @temporary_directory
148+
second = Pathname.new file
151149

152-
remote_filename = "#{@prefix}#{File.basename(file)}"
150+
remote_filename_path = second.relative_path_from first
151+
152+
remote_filename = remote_filename_path.to_s
153153

154154
@logger.debug("S3: ready to write file in bucket", :remote_filename => remote_filename, :bucket => @bucket)
155155

@@ -169,17 +169,21 @@ def write_on_bucket(file)
169169

170170
# This method is used for create new empty temporary files for use. Flag is needed for indicate new subsection time_file.
171171
public
172-
def create_temporary_file
173-
filename = File.join(@temporary_directory, get_temporary_filename(@page_counter))
174-
175-
@logger.debug("S3: Creating a new temporary file", :filename => filename)
176-
177-
@file_rotation_lock.synchronize do
178-
unless @tempfile.nil?
179-
@tempfile.close
172+
def create_temporary_file(prefix)
173+
filename = File.join(@temporary_directory, prefix, get_temporary_filename(@page_counter[prefix]))
174+
@file_rotation_lock[prefix].synchronize do
175+
unless @tempfile[prefix].nil?
176+
@tempfile[prefix].close
177+
end
178+
179+
if @prefixes.include? prefix
180+
dirname = File.dirname(filename)
181+
unless File.directory?(dirname)
182+
FileUtils.mkdir_p(dirname)
183+
end
184+
@logger.debug("S3: Creating a new temporary file", :filename => filename)
185+
@tempfile[prefix] = File.open(filename, "a")
180186
end
181-
182-
@tempfile = File.open(filename, "a")
183187
end
184188
end
185189

@@ -194,7 +198,11 @@ def register
194198

195199
@s3 = aws_s3_config
196200
@upload_queue = Queue.new
197-
@file_rotation_lock = Mutex.new
201+
@file_rotation_lock = Hash.new
202+
@tempfile = Hash.new
203+
@page_counter = Hash.new
204+
@prefixes = Set.new
205+
@empty_uploads = Hash.new
198206

199207
if @prefix && @prefix =~ S3_INVALID_CHARACTERS
200208
@logger.error("S3: prefix contains invalid characters", :prefix => @prefix, :contains => S3_INVALID_CHARACTERS)
@@ -206,15 +214,14 @@ def register
206214
end
207215

208216
test_s3_write
209-
210217
restore_from_crashes if @restore == true
211-
reset_page_counter
212-
create_temporary_file
218+
#reset_page_counter
219+
#create_temporary_file
213220
configure_periodic_rotation if time_file != 0
214221
configure_upload_workers
215222

216223
@codec.on_event do |event, encoded_event|
217-
handle_event(encoded_event)
224+
handle_event(encoded_event, event)
218225
end
219226
end
220227

@@ -251,13 +258,36 @@ def restore_from_crashes
251258
end
252259
end
253260

261+
public
262+
def shouldcleanup(prefix)
263+
return @empty_uploads[prefix] > @no_event_wait
264+
end
265+
254266
public
255267
def move_file_to_bucket(file)
268+
269+
@logger.debug("S3: moving to bucket ", :file => file)
270+
271+
basepath = Pathname.new @temporary_directory
272+
dirname = Pathname.new File.dirname(file)
273+
prefixpath = dirname.relative_path_from basepath
274+
prefix = prefixpath.to_s
275+
@logger.debug("S3: moving the file for prefix", :prefix => prefix)
276+
256277
if !File.zero?(file)
278+
if @prefixes.include? prefix
279+
@empty_uploads[prefix] = 0
280+
end
257281
write_on_bucket(file)
258282
@logger.debug("S3: file was put on the upload thread", :filename => File.basename(file), :bucket => @bucket)
283+
else
284+
if @prefixes.include? prefix
285+
@empty_uploads[prefix] += 1
286+
end
259287
end
260288

289+
@logger.debug("S3: empty_uploads for the prefix ", :prefix => prefix, :empty_uploads => @empty_uploads[prefix])
290+
261291
begin
262292
File.delete(file)
263293
rescue Errno::ENOENT
@@ -266,6 +296,10 @@ def move_file_to_bucket(file)
266296
rescue Errno::EACCES
267297
@logger.error("S3: Logstash doesnt have the permission to delete the file in the temporary directory.", :filename => File.basename(file), :temporary_directory => @temporary_directory)
268298
end
299+
300+
if shouldcleanup(prefix)
301+
cleanprefix(prefix)
302+
end
269303
end
270304

271305
public
@@ -292,9 +326,10 @@ def receive(event)
292326
end
293327

294328
public
295-
def rotate_events_log?
296-
@file_rotation_lock.synchronize do
297-
@tempfile.size > @size_file
329+
330+
def rotate_events_log(prefix)
331+
@file_rotation_lock[prefix].synchronize do
332+
@tempfile[prefix].size > @size_file
298333
end
299334
end
300335

@@ -304,12 +339,13 @@ def write_events_to_multiple_files?
304339
end
305340

306341
public
307-
def write_to_tempfile(event)
342+
def write_to_tempfile(event, prefix)
343+
308344
begin
309-
@logger.debug("S3: put event into tempfile ", :tempfile => File.basename(@tempfile))
345+
@logger.debug("S3: put event into tempfile ", :tempfile => File.basename(@tempfile[prefix]))
310346

311-
@file_rotation_lock.synchronize do
312-
@tempfile.syswrite(event)
347+
@file_rotation_lock[prefix].synchronize do
348+
@tempfile[prefix].syswrite(event)
313349
end
314350
rescue Errno::ENOSPC
315351
@logger.error("S3: No space left in temporary directory", :temporary_directory => @temporary_directory)
@@ -318,12 +354,14 @@ def write_to_tempfile(event)
318354
end
319355

320356
public
321-
def close
357+
def close()
322358
shutdown_upload_workers
323359
@periodic_rotation_thread.stop! if @periodic_rotation_thread
324-
325-
@file_rotation_lock.synchronize do
326-
@tempfile.close unless @tempfile.nil? && @tempfile.closed?
360+
361+
for prefix in @prefixes
362+
@file_rotation_lock[prefix].synchronize do
363+
@tempfile[prefix].close unless @tempfile[prefix].nil? && @tempfile[prefix].closed?
364+
end
327365
end
328366
end
329367

@@ -334,37 +372,63 @@ def shutdown_upload_workers
334372
end
335373

336374
private
337-
def handle_event(encoded_event)
375+
def handle_event(encoded_event, event)
376+
actualprefix = event.sprintf(@prefix)
377+
if not @prefixes.to_a().include? actualprefix
378+
@file_rotation_lock[actualprefix] = Mutex.new
379+
@prefixes.add(actualprefix)
380+
reset_page_counter(actualprefix)
381+
create_temporary_file(actualprefix)
382+
@empty_uploads[actualprefix] = 0
383+
end
384+
338385
if write_events_to_multiple_files?
339-
if rotate_events_log?
340-
@logger.debug("S3: tempfile is too large, let's bucket it and create new file", :tempfile => File.basename(@tempfile))
386+
if rotate_events_log(actualprefix)
387+
@logger.debug("S3: tempfile is too large, let's bucket it and create new file", :tempfile => File.basename(@tempfile[actualprefix]))
341388

342-
move_file_to_bucket_async(@tempfile.path)
343-
next_page
344-
create_temporary_file
389+
move_file_to_bucket_async(@tempfile[actualprefix].path)
390+
next_page(actualprefix)
391+
create_temporary_file(actualprefix)
345392
else
346-
@logger.debug("S3: tempfile file size report.", :tempfile_size => @tempfile.size, :size_file => @size_file)
393+
@logger.debug("S3: tempfile file size report.", :tempfile_size => @tempfile[actualprefix].size, :size_file => @size_file)
347394
end
348395
end
349396

350-
write_to_tempfile(encoded_event)
397+
write_to_tempfile(encoded_event, actualprefix)
351398
end
352399

353400
private
354401
def configure_periodic_rotation
355402
@periodic_rotation_thread = Stud::Task.new do
356403
LogStash::Util::set_thread_name("<S3 periodic uploader")
357404

358-
Stud.interval(periodic_interval, :sleep_then_run => true) do
359-
@logger.debug("S3: time_file triggered, bucketing the file", :filename => @tempfile.path)
360-
361-
move_file_to_bucket_async(@tempfile.path)
362-
next_page
363-
create_temporary_file
405+
Stud.interval(periodic_interval, :sleep_then_run => true) do
406+
407+
@tempfile.keys.each do |key|
408+
@logger.debug("S3: time_file triggered, bucketing the file", :filename => @tempfile[key].path)
409+
move_file_to_bucket_async(@tempfile[key].path)
410+
next_page(key)
411+
create_temporary_file(key)
412+
end
413+
364414
end
365415
end
366416
end
367417

418+
private
419+
def cleanprefix(prefix)
420+
path = File.join(@temporary_directory, prefix)
421+
@logger.debug("cleaning the directory and prefix ", :dir => path, :prefix => prefix)
422+
@file_rotation_lock[prefix].synchronize do
423+
@tempfile[prefix].close
424+
Dir.foreach(path) {|f| fn = File.join(path, f); File.delete(fn) if f != '.' && f != '..'}
425+
FileUtils.remove_dir(path)
426+
@prefixes.delete(prefix)
427+
@tempfile.delete(prefix)
428+
@empty_uploads[prefix] = 0
429+
end
430+
end
431+
368432
private
369433
def configure_upload_workers
370434
@logger.debug("S3: Configure upload workers")
@@ -397,13 +461,13 @@ def upload_worker
397461
end
398462

399463
private
400-
def next_page
401-
@page_counter += 1
464+
def next_page(key)
465+
@page_counter[key] += 1
402466
end
403467

404468
private
405-
def reset_page_counter
406-
@page_counter = 0
469+
def reset_page_counter(key)
470+
@page_counter[key] = 0
407471
end
408472

409473
private

0 commit comments

Comments
 (0)