diff --git a/Gemfile b/Gemfile new file mode 100644 index 0000000..a79822e --- /dev/null +++ b/Gemfile @@ -0,0 +1,6 @@ +source 'http://rubygems.org' + +gem 'rake' + +# Specify your gem's dependencies in orgno_validator.gemspec +gemspec diff --git a/Gemfile.lock b/Gemfile.lock new file mode 100644 index 0000000..966ddc5 --- /dev/null +++ b/Gemfile.lock @@ -0,0 +1,37 @@ +PATH + remote: . + specs: + mongo_queue (0.1.3) + bson (= 1.1.4) + bson_ext (= 1.1.4) + mongo (= 1.1.4) + +GEM + remote: http://rubygems.org/ + specs: + bson (1.1.4) + bson_ext (1.1.4) + diff-lcs (1.1.3) + json (1.6.2) + mongo (1.1.4) + bson (>= 1.1.1) + rake (0.9.2.2) + rdoc (3.11) + json (~> 1.4) + rspec (2.7.0) + rspec-core (~> 2.7.0) + rspec-expectations (~> 2.7.0) + rspec-mocks (~> 2.7.0) + rspec-core (2.7.1) + rspec-expectations (2.7.0) + diff-lcs (~> 1.1.2) + rspec-mocks (2.7.0) + +PLATFORMS + ruby + +DEPENDENCIES + mongo_queue! + rake + rdoc + rspec diff --git a/Rakefile b/Rakefile index cb6debf..d39cc74 100644 --- a/Rakefile +++ b/Rakefile @@ -1,46 +1,21 @@ -require 'rubygems' -require 'rake' +require 'bundler' +require 'rdoc/task' +require 'rspec/core/rake_task' -begin - require 'jeweler' - Jeweler::Tasks.new do |gem| - gem.name = "mongo_queue" - gem.summary = "Mongo based message/job queue" - gem.description = "An extensible thread safe job/message queueing system that uses mongodb as the persistent storage engine." - gem.email = "jmartin@webwideconsulting.com" - gem.homepage = "http://github.com/skiz/mongo_queue" - gem.authors = ["Josh Martin"] - gem.add_development_dependency "rspec", ">= 0" - end - Jeweler::GemcutterTasks.new -rescue LoadError - puts "Jeweler (or a dependency) not available. Install it with: gem install jeweler" -end - -require 'spec/rake/spectask' -Spec::Rake::SpecTask.new(:spec) do |spec| - spec.libs << 'lib' << 'spec' - spec.spec_files = FileList['spec/*_spec.rb'] -end +Bundler::GemHelper.install_tasks -Spec::Rake::SpecTask.new(:rcov) do |spec| - spec.libs << 'lib' << 'spec' - spec.pattern = 'spec/*_spec.rb' - spec.rcov = true +RSpec::Core::RakeTask.new do |t| + t.rspec_opts = %w(--format documentation --colour) end -task :spec => :check_dependencies -task :default => :spec - -require 'rake/rdoctask' -Rake::RDocTask.new do |rdoc| +RDoc::Task.new do |rdoc| version = File.exist?('VERSION') ? File.read('VERSION') : "" + rdoc.rdoc_dir = 'rdoc' - # rdoc.options << '--diagram' - rdoc.title = "Epoch Notifier #{version}" + rdoc.title = "mongo_queue #{version}" rdoc.rdoc_files.include('README*') rdoc.rdoc_files.include('lib/**/*.rb') end -require 'metric_fu' +task :default => 'spec' \ No newline at end of file diff --git a/lib/mongo_queue.rb b/lib/mongo_queue.rb index 0e9b67a..89d54ec 100644 --- a/lib/mongo_queue.rb +++ b/lib/mongo_queue.rb @@ -40,7 +40,7 @@ def flush! # queue.insert(:name => 'Billy', :email => 'billy@example.com', :message => 'Here is the thing you asked for') def insert(hash) id = collection.insert DEFAULT_INSERT.merge(hash) - collection.find_one(:_id => Mongo::ObjectID.from_string(id.to_s)) + collection.find_one(:_id => BSON::ObjectId.from_string(id.to_s)) end # Lock and return the next queue message if one is available. Returns nil if none are available. Be sure to @@ -48,14 +48,19 @@ def insert(hash) # Example: # locked_doc = queue.lock_next(Thread.current.object_id) def lock_next(locked_by) - cmd = OrderedHash.new + cmd = BSON::OrderedHash.new cmd['findandmodify'] = @config[:collection] cmd['update'] = {'$set' => {:locked_by => locked_by, :locked_at => Time.now.utc}} cmd['query'] = {:locked_by => nil, :locked_by => nil, :attempts => {'$lt' => @config[:attempts]}} cmd['sort'] = sort_hash cmd['limit'] = 1 cmd['new'] = true - value_of collection.db.command(cmd) + + begin + value_of collection.db.command(cmd) + rescue Mongo::OperationFailure + nil + end end # Removes stale locks that have exceeded the timeout and places them back in the queue. @@ -70,10 +75,10 @@ def cleanup! # Release a lock on the specified document and allow it to become available again. def release(doc, locked_by) - cmd = OrderedHash.new + cmd = BSON::OrderedHash.new cmd['findandmodify'] = @config[:collection] cmd['update'] = {'$set' => {:locked_by => nil, :locked_at => nil}} - cmd['query'] = {:locked_by => locked_by, :_id => Mongo::ObjectID.from_string(doc['_id'].to_s)} + cmd['query'] = {:locked_by => locked_by, :_id => BSON::ObjectId.from_string(doc['_id'].to_s)} cmd['limit'] = 1 cmd['new'] = true value_of collection.db.command(cmd) @@ -82,9 +87,9 @@ def release(doc, locked_by) # Remove the document from the queue. This should be called when the work is done and the document is no longer needed. # You must provide the process identifier that the document was locked with to complete it. def complete(doc, locked_by) - cmd = OrderedHash.new + cmd = BSON::OrderedHash.new cmd['findandmodify'] = @config[:collection] - cmd['query'] = {:locked_by => locked_by, :_id => Mongo::ObjectID.from_string(doc['_id'].to_s)} + cmd['query'] = {:locked_by => locked_by, :_id => BSON::ObjectId.from_string(doc['_id'].to_s)} cmd['remove'] = true cmd['limit'] = 1 value_of collection.db.command(cmd) @@ -127,7 +132,7 @@ def stats protected def sort_hash #:nodoc: - sh = OrderedHash.new + sh = BSON::OrderedHash.new sh['priority'] = -1 ; sh end diff --git a/mongo_queue.gemspec b/mongo_queue.gemspec index d64139b..46bd11c 100644 --- a/mongo_queue.gemspec +++ b/mongo_queue.gemspec @@ -1,54 +1,29 @@ -# Generated by jeweler -# DO NOT EDIT THIS FILE DIRECTLY -# Instead, edit Jeweler::Tasks in Rakefile, and run the gemspec command # -*- encoding: utf-8 -*- -Gem::Specification.new do |s| - s.name = %q{mongo_queue} - s.version = "0.1.1" +Gem::Specification.new do |gem| + gem.authors = ["Josh Martin"] + gem.email = %q{jmartin@webwideconsulting.com} + gem.description = %q{An extensible thread safe job/message queueing system that uses mongodb as the persistent storage engine.} + gem.summary = %q{Mongo based message/job queue} + gem.homepage = %q{http://github.com/skiz/mongo_queue} + gem.date = %q{2010-03-30} - s.required_rubygems_version = Gem::Requirement.new(">= 0") if s.respond_to? :required_rubygems_version= - s.authors = ["Josh Martin"] - s.date = %q{2010-03-30} - s.description = %q{An extensible thread safe job/message queueing system that uses mongodb as the persistent storage engine.} - s.email = %q{jmartin@webwideconsulting.com} - s.extra_rdoc_files = [ + gem.executables = `git ls-files -- bin/*`.split("\n").map{ |f| File.basename(f) } + gem.files = `git ls-files`.split("\n") + gem.test_files = `git ls-files -- {test,spec,features}/*`.split("\n") + gem.name = %q{mongo_queue} + gem.require_paths = ["lib"] + gem.version = '0.1.3' + gem.add_development_dependency("rspec", ">= 0") + gem.add_development_dependency("rdoc", ">= 0") + gem.add_dependency("mongo", ">= 1.5") + gem.add_dependency("bson_ext", ">= 1.5") + + + gem.rdoc_options = ["--charset=UTF-8"] + gem.extra_rdoc_files = [ "LICENSE", - "README.rdoc" + "README.rdoc" ] - s.files = [ - ".document", - ".gitignore", - "LICENSE", - "README.rdoc", - "Rakefile", - "VERSION", - "lib/mongo_queue.rb", - "mongo_queue.gemspec", - "spec/mongo_queue_spec.rb", - "spec/spec_helper.rb" - ] - s.homepage = %q{http://github.com/skiz/mongo_queue} - s.rdoc_options = ["--charset=UTF-8"] - s.require_paths = ["lib"] - s.rubygems_version = %q{1.3.5} - s.summary = %q{Mongo based message/job queue} - s.test_files = [ - "spec/mongo_queue_spec.rb", - "spec/spec_helper.rb" - ] - - if s.respond_to? :specification_version then - current_version = Gem::Specification::CURRENT_SPECIFICATION_VERSION - s.specification_version = 3 - - if Gem::Version.new(Gem::RubyGemsVersion) >= Gem::Version.new('1.2.0') then - s.add_development_dependency(%q, [">= 0"]) - else - s.add_dependency(%q, [">= 0"]) - end - else - s.add_dependency(%q, [">= 0"]) - end end diff --git a/spec/mongo_queue_spec.rb b/spec/mongo_queue_spec.rb index f454d31..de10939 100644 --- a/spec/mongo_queue_spec.rb +++ b/spec/mongo_queue_spec.rb @@ -2,40 +2,40 @@ describe Mongo::Queue do - before(:suite) do + before(:all) do opts = { :database => 'mongo_queue_spec', :collection => 'spec', :attempts => 4, :timeout => 60} - @@db = Mongo::Connection.new('localhost', nil, :pool_size => 4) - @@queue = Mongo::Queue.new(@@db, opts) + @db = Mongo::Connection.new('localhost', nil, :pool_size => 4) + @queue = Mongo::Queue.new(@db, opts) end before(:each) do - @@queue.flush! + @queue.flush! end describe "Configuration" do it "should set the connection" do - @@queue.connection.should be(@@db) + @queue.connection.should be(@db) end it "should allow database option" do - @@queue.config[:database].should eql('mongo_queue_spec') + @queue.config[:database].should eql('mongo_queue_spec') end it "should allow collection option" do - @@queue.config[:collection].should eql('spec') + @queue.config[:collection].should eql('spec') end it "should allow attempts option" do - @@queue.config[:attempts].should eql(4) + @queue.config[:attempts].should eql(4) end it "should allow timeout option" do - @@queue.config[:timeout].should eql(60) + @queue.config[:timeout].should eql(60) end it "should have a sane set of defaults" do @@ -48,8 +48,8 @@ describe "Inserting a Job" do before(:each) do - @@queue.insert(:message => 'MongoQueueSpec') - @item = @@queue.send(:collection).find_one + @queue.insert(:message => 'MongoQueueSpec') + @item = @queue.send(:collection).find_one end it "should set priority to 0 by default" do @@ -75,7 +75,7 @@ describe "Queue Information" do it "should provide a convenience method to retrieve stats about the queue" do - @@queue.stats.should eql({ + @queue.stats.should eql({ :locked => 0, :available => 0, :errors => 0, @@ -84,11 +84,11 @@ end it "should calculate properly" do - @first = @@queue.insert(:msg => 'First', :attempts => 4) - @second = @@queue.insert(:msg => 'Second', :priority => 2) - @third = @@queue.insert(:msg => 'Third', :priority => 6) - @fourth = @@queue.insert(:msg => 'Fourth', :locked_by => 'Example', :locked_at => Time.now.utc - 60 * 60 * 60, :priority => 99) - @@queue.stats.should eql({ + @first = @queue.insert(:msg => 'First', :attempts => 4) + @second = @queue.insert(:msg => 'Second', :priority => 2) + @third = @queue.insert(:msg => 'Third', :priority => 6) + @fourth = @queue.insert(:msg => 'Fourth', :locked_by => 'Example', :locked_at => Time.now.utc - 60 * 60 * 60, :priority => 99) + @queue.stats.should eql({ :locked => 1, :available => 2, :errors => 1, @@ -99,39 +99,39 @@ describe "Working with the queue" do before(:each) do - @first = @@queue.insert(:msg => 'First') - @second = @@queue.insert(:msg => 'Second', :priority => 2) - @third = @@queue.insert(:msg => 'Third', :priority => 6) - @fourth = @@queue.insert(:msg => 'Fourth', :locked_by => 'Example', :locked_at => Time.now.utc - 60 * 60 * 60, :priority => 99) + @first = @queue.insert(:msg => 'First') + @second = @queue.insert(:msg => 'Second', :priority => 2) + @third = @queue.insert(:msg => 'Third', :priority => 6) + @fourth = @queue.insert(:msg => 'Fourth', :locked_by => 'Example', :locked_at => Time.now.utc - 60 * 60 * 60, :priority => 99) end it "should lock the next document by priority" do - doc = @@queue.lock_next('Test') + doc = @queue.lock_next('Test') doc['msg'].should eql('Third') end it "should release and relock the next document" do - @@queue.release(@fourth, 'Example') - @@queue.lock_next('Bob')['msg'].should eql('Fourth') + @queue.release(@fourth, 'Example') + @queue.lock_next('Bob')['msg'].should eql('Fourth') end it "should remove completed items" do - doc = @@queue.lock_next('grr') - @@queue.complete(doc,'grr') - @@queue.lock_next('grr')['msg'].should eql('Second') + doc = @queue.lock_next('grr') + @queue.complete(doc,'grr') + @queue.lock_next('grr')['msg'].should eql('Second') end it "should return nil when unable to lock" do - 4.times{ @@queue.lock_next('blah') } - @@queue.lock_next('blah').should eql(nil) + 4.times{ @queue.lock_next('blah') } + @queue.lock_next('blah').should eql(nil) end end describe "Error Handling" do it "should allow document error handling" do - doc = @@queue.insert(:stuff => 'Broken') - 2.times{ @@queue.error(doc, 'I think I broke it') } - doc = @@queue.lock_next('Money') + doc = @queue.insert(:stuff => 'Broken') + 2.times{ @queue.error(doc, 'I think I broke it') } + doc = @queue.lock_next('Money') doc['attempts'].should eql(2) doc['last_error'].should eql('I think I broke it') end @@ -139,9 +139,9 @@ describe "Cleaning up" do it "should remove all of the stale locks" do - @@queue.insert(:msg => 'Fourth', :locked_by => 'Example', :locked_at => Time.now.utc - 60 * 60 * 60, :priority => 99) - @@queue.cleanup! - @@queue.lock_next('Foo')['msg'].should eql('Fourth') + @queue.insert(:msg => 'Fourth', :locked_by => 'Example', :locked_at => Time.now.utc - 60 * 60 * 60, :priority => 99) + @queue.cleanup! + @queue.lock_next('Foo')['msg'].should eql('Fourth') end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index d677ba8..cdf6e70 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -3,8 +3,8 @@ require 'rubygems' require 'mongo' -require 'lib/mongo_queue' - -Spec::Runner.configure do |config| +require 'mongo_queue' + +RSpec.configure do |config| #MongoQueue.log.level = Logger::FATAL end