lib/sidekiq/testing.rb in sidekiq-4.0.1 vs lib/sidekiq/testing.rb in sidekiq-4.0.2

- old
+ new

@@ -66,27 +66,128 @@ alias_method :raw_push_real, :raw_push def raw_push(payloads) if Sidekiq::Testing.fake? payloads.each do |job| - job['class'].constantize.jobs << Sidekiq.load_json(Sidekiq.dump_json(job)) + Queues.push(job['queue'], job['class'], Sidekiq.load_json(Sidekiq.dump_json(job))) end true elsif Sidekiq::Testing.inline? payloads.each do |job| - job['jid'] ||= SecureRandom.hex(12) klass = job['class'].constantize - klass.jobs.unshift Sidekiq.load_json(Sidekiq.dump_json(job)) - klass.perform_one + job['id'] ||= SecureRandom.hex(12) + job_hash = Sidekiq.load_json(Sidekiq.dump_json(job)) + klass.process_job(job_hash) end true else raw_push_real(payloads) end end end + module Queues + ## + # The Queues class is only for testing the fake queue implementation. + # There are 2 data structures involved in tandem. This is due to the + # Rspec syntax of change(QueueWorker.jobs, :size). It keeps a reference + # to the array. Because the array was dervied from a filter of the total + # jobs enqueued, it appeared as though the array didn't change. + # + # To solve this, we'll keep 2 hashes containing the jobs. One with keys based + # on the queue, and another with keys of the worker names, so the array for + # QueueWorker.jobs is a straight reference to a real array. + # + # Queue-based hash: + # + # { + # "default"=>[ + # { + # "class"=>"TestTesting::QueueWorker", + # "args"=>[1, 2], + # "retry"=>true, + # "queue"=>"default", + # "jid"=>"abc5b065c5c4b27fc1102833", + # "created_at"=>1447445554.419934 + # } + # ] + # } + # + # Worker-based hash: + # + # { + # "TestTesting::QueueWorker"=>[ + # { + # "class"=>"TestTesting::QueueWorker", + # "args"=>[1, 2], + # "retry"=>true, + # "queue"=>"default", + # "jid"=>"abc5b065c5c4b27fc1102833", + # "created_at"=>1447445554.419934 + # } + # ] + # } + # + # Example: + # + # require 'sidekiq/testing' + # + # assert_equal 0, Sidekiq::Queues["default"].size + # HardWorker.perform_async(:something) + # assert_equal 1, Sidekiq::Queues["default"].size + # assert_equal :something, Sidekiq::Queues["default"].first['args'][0] + # + # You can also clear all workers' jobs: + # + # assert_equal 0, Sidekiq::Queues["default"].size + # HardWorker.perform_async(:something) + # Sidekiq::Queues.clear_all + # assert_equal 0, Sidekiq::Queues["default"].size + # + # This can be useful to make sure jobs don't linger between tests: + # + # RSpec.configure do |config| + # config.before(:each) do + # Sidekiq::Queues.clear_all + # end + # end + # + class << self + def [](queue) + jobs_by_queue[queue] + end + + def push(queue, klass, job) + jobs_by_queue[queue] << job + jobs_by_worker[klass] << job + end + + def jobs_by_queue + @jobs_by_queue ||= Hash.new { |hash, key| hash[key] = [] } + end + + def jobs_by_worker + @jobs_by_worker ||= Hash.new { |hash, key| hash[key] = [] } + end + + def delete_for(jid, queue, klass) + jobs_by_queue[queue].delete_if { |job| job["jid"] == jid } + jobs_by_worker[klass].delete_if { |job| job["jid"] == jid } + end + + def clear_for(queue, klass) + jobs_by_queue[queue].clear + jobs_by_worker[klass].clear + end + + def clear_all + jobs_by_queue.clear + jobs_by_worker.clear + end + end + end + module Worker ## # The Sidekiq testing infrastructure overrides perform_async # so that it does not actually touch the network. Instead it # stores the asynchronous jobs in a per-class array so that @@ -141,32 +242,40 @@ # When I sign up as "foo@example.com" # Then I should receive a welcome email to "foo@example.com" # module ClassMethods + # Queue for this worker + def queue + self.sidekiq_options["queue"] + end + # Jobs queued for this worker def jobs - Worker.jobs[self] + Queues.jobs_by_worker[self.to_s] end # Clear all jobs for this worker def clear - jobs.clear + Queues.clear_for(queue, self.to_s) end # Drain and run all jobs for this worker def drain - while job = jobs.shift do - process_job(job) + while jobs.any? + next_job = jobs.first + Queues.delete_for(next_job["jid"], queue, self.to_s) + process_job(next_job) end end # Pop out a single job and perform it def perform_one raise(EmptyQueueError, "perform_one called with empty job queue") if jobs.empty? - job = jobs.shift - process_job(job) + next_job = jobs.first + Queues.delete_for(next_job["jid"], queue, self.to_s) + process_job(next_job) end def process_job(job) worker = new worker.jid = job['jid'] @@ -181,21 +290,25 @@ end end class << self def jobs # :nodoc: - @jobs ||= Hash.new { |hash, key| hash[key] = [] } + Queues.jobs_by_queue.values.flatten end # Clear all queued jobs across all workers def clear_all - jobs.clear + Queues.clear_all end # Drain all queued jobs across all workers def drain_all - until jobs.values.all?(&:empty?) do - jobs.keys.each(&:drain) + while jobs.any? + worker_classes = jobs.map { |job| job["class"] }.uniq + + worker_classes.each do |worker_class| + worker_class.constantize.drain + end end end end end end