Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
source 'http://rubygems.org'

gem 'rake'

# Specify your gem's dependencies in orgno_validator.gemspec
gemspec
37 changes: 37 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
PATH
remote: .
specs:
mongo_queue (0.1.3)
bson (= 1.1.4)
bson_ext (= 1.1.4)
mongo (= 1.1.4)

GEM
remote: http://rubygems.org/
specs:
bson (1.1.4)
bson_ext (1.1.4)
diff-lcs (1.1.3)
json (1.6.2)
mongo (1.1.4)
bson (>= 1.1.1)
rake (0.9.2.2)
rdoc (3.11)
json (~> 1.4)
rspec (2.7.0)
rspec-core (~> 2.7.0)
rspec-expectations (~> 2.7.0)
rspec-mocks (~> 2.7.0)
rspec-core (2.7.1)
rspec-expectations (2.7.0)
diff-lcs (~> 1.1.2)
rspec-mocks (2.7.0)

PLATFORMS
ruby

DEPENDENCIES
mongo_queue!
rake
rdoc
rspec
45 changes: 10 additions & 35 deletions Rakefile
Original file line number Diff line number Diff line change
@@ -1,46 +1,21 @@
require 'rubygems'
require 'rake'
require 'bundler'
require 'rdoc/task'
require 'rspec/core/rake_task'

begin
require 'jeweler'
Jeweler::Tasks.new do |gem|
gem.name = "mongo_queue"
gem.summary = "Mongo based message/job queue"
gem.description = "An extensible thread safe job/message queueing system that uses mongodb as the persistent storage engine."
gem.email = "[email protected]"
gem.homepage = "http://github.com/skiz/mongo_queue"
gem.authors = ["Josh Martin"]
gem.add_development_dependency "rspec", ">= 0"
end
Jeweler::GemcutterTasks.new
rescue LoadError
puts "Jeweler (or a dependency) not available. Install it with: gem install jeweler"
end

require 'spec/rake/spectask'
Spec::Rake::SpecTask.new(:spec) do |spec|
spec.libs << 'lib' << 'spec'
spec.spec_files = FileList['spec/*_spec.rb']
end
Bundler::GemHelper.install_tasks

Spec::Rake::SpecTask.new(:rcov) do |spec|
spec.libs << 'lib' << 'spec'
spec.pattern = 'spec/*_spec.rb'
spec.rcov = true
RSpec::Core::RakeTask.new do |t|
t.rspec_opts = %w(--format documentation --colour)
end

task :spec => :check_dependencies

task :default => :spec

require 'rake/rdoctask'
Rake::RDocTask.new do |rdoc|
RDoc::Task.new do |rdoc|
version = File.exist?('VERSION') ? File.read('VERSION') : ""

rdoc.rdoc_dir = 'rdoc'
# rdoc.options << '--diagram'
rdoc.title = "Epoch Notifier #{version}"
rdoc.title = "mongo_queue #{version}"
rdoc.rdoc_files.include('README*')
rdoc.rdoc_files.include('lib/**/*.rb')
end

require 'metric_fu'
task :default => 'spec'
21 changes: 13 additions & 8 deletions lib/mongo_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,27 @@ def flush!
# queue.insert(:name => 'Billy', :email => '[email protected]', :message => 'Here is the thing you asked for')
def insert(hash)
id = collection.insert DEFAULT_INSERT.merge(hash)
collection.find_one(:_id => Mongo::ObjectID.from_string(id.to_s))
collection.find_one(:_id => BSON::ObjectId.from_string(id.to_s))
end

# Lock and return the next queue message if one is available. Returns nil if none are available. Be sure to
# review the README.rdoc regarding proper usage of the locking process identifier (locked_by).
# Example:
# locked_doc = queue.lock_next(Thread.current.object_id)
def lock_next(locked_by)
cmd = OrderedHash.new
cmd = BSON::OrderedHash.new
cmd['findandmodify'] = @config[:collection]
cmd['update'] = {'$set' => {:locked_by => locked_by, :locked_at => Time.now.utc}}
cmd['query'] = {:locked_by => nil, :locked_by => nil, :attempts => {'$lt' => @config[:attempts]}}
cmd['sort'] = sort_hash
cmd['limit'] = 1
cmd['new'] = true
value_of collection.db.command(cmd)

begin
value_of collection.db.command(cmd)
rescue Mongo::OperationFailure
nil
end
end

# Removes stale locks that have exceeded the timeout and places them back in the queue.
Expand All @@ -70,10 +75,10 @@ def cleanup!

# Release a lock on the specified document and allow it to become available again.
def release(doc, locked_by)
cmd = OrderedHash.new
cmd = BSON::OrderedHash.new
cmd['findandmodify'] = @config[:collection]
cmd['update'] = {'$set' => {:locked_by => nil, :locked_at => nil}}
cmd['query'] = {:locked_by => locked_by, :_id => Mongo::ObjectID.from_string(doc['_id'].to_s)}
cmd['query'] = {:locked_by => locked_by, :_id => BSON::ObjectId.from_string(doc['_id'].to_s)}
cmd['limit'] = 1
cmd['new'] = true
value_of collection.db.command(cmd)
Expand All @@ -82,9 +87,9 @@ def release(doc, locked_by)
# Remove the document from the queue. This should be called when the work is done and the document is no longer needed.
# You must provide the process identifier that the document was locked with to complete it.
def complete(doc, locked_by)
cmd = OrderedHash.new
cmd = BSON::OrderedHash.new
cmd['findandmodify'] = @config[:collection]
cmd['query'] = {:locked_by => locked_by, :_id => Mongo::ObjectID.from_string(doc['_id'].to_s)}
cmd['query'] = {:locked_by => locked_by, :_id => BSON::ObjectId.from_string(doc['_id'].to_s)}
cmd['remove'] = true
cmd['limit'] = 1
value_of collection.db.command(cmd)
Expand Down Expand Up @@ -127,7 +132,7 @@ def stats
protected

def sort_hash #:nodoc:
sh = OrderedHash.new
sh = BSON::OrderedHash.new
sh['priority'] = -1 ; sh
end

Expand Down
69 changes: 22 additions & 47 deletions mongo_queue.gemspec
Original file line number Diff line number Diff line change
@@ -1,54 +1,29 @@
# Generated by jeweler
# DO NOT EDIT THIS FILE DIRECTLY
# Instead, edit Jeweler::Tasks in Rakefile, and run the gemspec command
# -*- encoding: utf-8 -*-

Gem::Specification.new do |s|
s.name = %q{mongo_queue}
s.version = "0.1.1"
Gem::Specification.new do |gem|
gem.authors = ["Josh Martin"]
gem.email = %q{[email protected]}
gem.description = %q{An extensible thread safe job/message queueing system that uses mongodb as the persistent storage engine.}
gem.summary = %q{Mongo based message/job queue}
gem.homepage = %q{http://github.com/skiz/mongo_queue}
gem.date = %q{2010-03-30}

s.required_rubygems_version = Gem::Requirement.new(">= 0") if s.respond_to? :required_rubygems_version=
s.authors = ["Josh Martin"]
s.date = %q{2010-03-30}
s.description = %q{An extensible thread safe job/message queueing system that uses mongodb as the persistent storage engine.}
s.email = %q{[email protected]}
s.extra_rdoc_files = [
gem.executables = `git ls-files -- bin/*`.split("\n").map{ |f| File.basename(f) }
gem.files = `git ls-files`.split("\n")
gem.test_files = `git ls-files -- {test,spec,features}/*`.split("\n")
gem.name = %q{mongo_queue}
gem.require_paths = ["lib"]
gem.version = '0.1.3'
gem.add_development_dependency("rspec", ">= 0")
gem.add_development_dependency("rdoc", ">= 0")
gem.add_dependency("mongo", ">= 1.5")
gem.add_dependency("bson_ext", ">= 1.5")


gem.rdoc_options = ["--charset=UTF-8"]
gem.extra_rdoc_files = [
"LICENSE",
"README.rdoc"
"README.rdoc"
]
s.files = [
".document",
".gitignore",
"LICENSE",
"README.rdoc",
"Rakefile",
"VERSION",
"lib/mongo_queue.rb",
"mongo_queue.gemspec",
"spec/mongo_queue_spec.rb",
"spec/spec_helper.rb"
]
s.homepage = %q{http://github.com/skiz/mongo_queue}
s.rdoc_options = ["--charset=UTF-8"]
s.require_paths = ["lib"]
s.rubygems_version = %q{1.3.5}
s.summary = %q{Mongo based message/job queue}
s.test_files = [
"spec/mongo_queue_spec.rb",
"spec/spec_helper.rb"
]

if s.respond_to? :specification_version then
current_version = Gem::Specification::CURRENT_SPECIFICATION_VERSION
s.specification_version = 3

if Gem::Version.new(Gem::RubyGemsVersion) >= Gem::Version.new('1.2.0') then
s.add_development_dependency(%q<rspec>, [">= 0"])
else
s.add_dependency(%q<rspec>, [">= 0"])
end
else
s.add_dependency(%q<rspec>, [">= 0"])
end
end

70 changes: 35 additions & 35 deletions spec/mongo_queue_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,40 @@

describe Mongo::Queue do

before(:suite) do
before(:all) do
opts = {
:database => 'mongo_queue_spec',
:collection => 'spec',
:attempts => 4,
:timeout => 60}
@@db = Mongo::Connection.new('localhost', nil, :pool_size => 4)
@@queue = Mongo::Queue.new(@@db, opts)
@db = Mongo::Connection.new('localhost', nil, :pool_size => 4)
@queue = Mongo::Queue.new(@db, opts)
end

before(:each) do
@@queue.flush!
@queue.flush!
end

describe "Configuration" do

it "should set the connection" do
@@queue.connection.should be(@@db)
@queue.connection.should be(@db)
end

it "should allow database option" do
@@queue.config[:database].should eql('mongo_queue_spec')
@queue.config[:database].should eql('mongo_queue_spec')
end

it "should allow collection option" do
@@queue.config[:collection].should eql('spec')
@queue.config[:collection].should eql('spec')
end

it "should allow attempts option" do
@@queue.config[:attempts].should eql(4)
@queue.config[:attempts].should eql(4)
end

it "should allow timeout option" do
@@queue.config[:timeout].should eql(60)
@queue.config[:timeout].should eql(60)
end

it "should have a sane set of defaults" do
Expand All @@ -48,8 +48,8 @@

describe "Inserting a Job" do
before(:each) do
@@queue.insert(:message => 'MongoQueueSpec')
@item = @@queue.send(:collection).find_one
@queue.insert(:message => 'MongoQueueSpec')
@item = @queue.send(:collection).find_one
end

it "should set priority to 0 by default" do
Expand All @@ -75,7 +75,7 @@

describe "Queue Information" do
it "should provide a convenience method to retrieve stats about the queue" do
@@queue.stats.should eql({
@queue.stats.should eql({
:locked => 0,
:available => 0,
:errors => 0,
Expand All @@ -84,11 +84,11 @@
end

it "should calculate properly" do
@first = @@queue.insert(:msg => 'First', :attempts => 4)
@second = @@queue.insert(:msg => 'Second', :priority => 2)
@third = @@queue.insert(:msg => 'Third', :priority => 6)
@fourth = @@queue.insert(:msg => 'Fourth', :locked_by => 'Example', :locked_at => Time.now.utc - 60 * 60 * 60, :priority => 99)
@@queue.stats.should eql({
@first = @queue.insert(:msg => 'First', :attempts => 4)
@second = @queue.insert(:msg => 'Second', :priority => 2)
@third = @queue.insert(:msg => 'Third', :priority => 6)
@fourth = @queue.insert(:msg => 'Fourth', :locked_by => 'Example', :locked_at => Time.now.utc - 60 * 60 * 60, :priority => 99)
@queue.stats.should eql({
:locked => 1,
:available => 2,
:errors => 1,
Expand All @@ -99,49 +99,49 @@

describe "Working with the queue" do
before(:each) do
@first = @@queue.insert(:msg => 'First')
@second = @@queue.insert(:msg => 'Second', :priority => 2)
@third = @@queue.insert(:msg => 'Third', :priority => 6)
@fourth = @@queue.insert(:msg => 'Fourth', :locked_by => 'Example', :locked_at => Time.now.utc - 60 * 60 * 60, :priority => 99)
@first = @queue.insert(:msg => 'First')
@second = @queue.insert(:msg => 'Second', :priority => 2)
@third = @queue.insert(:msg => 'Third', :priority => 6)
@fourth = @queue.insert(:msg => 'Fourth', :locked_by => 'Example', :locked_at => Time.now.utc - 60 * 60 * 60, :priority => 99)
end

it "should lock the next document by priority" do
doc = @@queue.lock_next('Test')
doc = @queue.lock_next('Test')
doc['msg'].should eql('Third')
end

it "should release and relock the next document" do
@@queue.release(@fourth, 'Example')
@@queue.lock_next('Bob')['msg'].should eql('Fourth')
@queue.release(@fourth, 'Example')
@queue.lock_next('Bob')['msg'].should eql('Fourth')
end

it "should remove completed items" do
doc = @@queue.lock_next('grr')
@@queue.complete(doc,'grr')
@@queue.lock_next('grr')['msg'].should eql('Second')
doc = @queue.lock_next('grr')
@queue.complete(doc,'grr')
@queue.lock_next('grr')['msg'].should eql('Second')
end

it "should return nil when unable to lock" do
4.times{ @@queue.lock_next('blah') }
@@queue.lock_next('blah').should eql(nil)
4.times{ @queue.lock_next('blah') }
@queue.lock_next('blah').should eql(nil)
end
end

describe "Error Handling" do
it "should allow document error handling" do
doc = @@queue.insert(:stuff => 'Broken')
2.times{ @@queue.error(doc, 'I think I broke it') }
doc = @@queue.lock_next('Money')
doc = @queue.insert(:stuff => 'Broken')
2.times{ @queue.error(doc, 'I think I broke it') }
doc = @queue.lock_next('Money')
doc['attempts'].should eql(2)
doc['last_error'].should eql('I think I broke it')
end
end

describe "Cleaning up" do
it "should remove all of the stale locks" do
@@queue.insert(:msg => 'Fourth', :locked_by => 'Example', :locked_at => Time.now.utc - 60 * 60 * 60, :priority => 99)
@@queue.cleanup!
@@queue.lock_next('Foo')['msg'].should eql('Fourth')
@queue.insert(:msg => 'Fourth', :locked_by => 'Example', :locked_at => Time.now.utc - 60 * 60 * 60, :priority => 99)
@queue.cleanup!
@queue.lock_next('Foo')['msg'].should eql('Fourth')
end
end

Expand Down
Loading