lib/sidekiq/testing.rb in sidekiq-4.0.1 vs lib/sidekiq/testing.rb in sidekiq-4.0.2
- old
+ new
@@ -66,27 +66,128 @@
alias_method :raw_push_real, :raw_push
def raw_push(payloads)
if Sidekiq::Testing.fake?
payloads.each do |job|
- job['class'].constantize.jobs << Sidekiq.load_json(Sidekiq.dump_json(job))
+ Queues.push(job['queue'], job['class'], Sidekiq.load_json(Sidekiq.dump_json(job)))
end
true
elsif Sidekiq::Testing.inline?
payloads.each do |job|
- job['jid'] ||= SecureRandom.hex(12)
klass = job['class'].constantize
- klass.jobs.unshift Sidekiq.load_json(Sidekiq.dump_json(job))
- klass.perform_one
+ job['id'] ||= SecureRandom.hex(12)
+ job_hash = Sidekiq.load_json(Sidekiq.dump_json(job))
+ klass.process_job(job_hash)
end
true
else
raw_push_real(payloads)
end
end
end
+ module Queues
+ ##
+ # The Queues class is only for testing the fake queue implementation.
+ # There are 2 data structures involved in tandem. This is due to the
+ # Rspec syntax of change(QueueWorker.jobs, :size). It keeps a reference
+ # to the array. Because the array was dervied from a filter of the total
+ # jobs enqueued, it appeared as though the array didn't change.
+ #
+ # To solve this, we'll keep 2 hashes containing the jobs. One with keys based
+ # on the queue, and another with keys of the worker names, so the array for
+ # QueueWorker.jobs is a straight reference to a real array.
+ #
+ # Queue-based hash:
+ #
+ # {
+ # "default"=>[
+ # {
+ # "class"=>"TestTesting::QueueWorker",
+ # "args"=>[1, 2],
+ # "retry"=>true,
+ # "queue"=>"default",
+ # "jid"=>"abc5b065c5c4b27fc1102833",
+ # "created_at"=>1447445554.419934
+ # }
+ # ]
+ # }
+ #
+ # Worker-based hash:
+ #
+ # {
+ # "TestTesting::QueueWorker"=>[
+ # {
+ # "class"=>"TestTesting::QueueWorker",
+ # "args"=>[1, 2],
+ # "retry"=>true,
+ # "queue"=>"default",
+ # "jid"=>"abc5b065c5c4b27fc1102833",
+ # "created_at"=>1447445554.419934
+ # }
+ # ]
+ # }
+ #
+ # Example:
+ #
+ # require 'sidekiq/testing'
+ #
+ # assert_equal 0, Sidekiq::Queues["default"].size
+ # HardWorker.perform_async(:something)
+ # assert_equal 1, Sidekiq::Queues["default"].size
+ # assert_equal :something, Sidekiq::Queues["default"].first['args'][0]
+ #
+ # You can also clear all workers' jobs:
+ #
+ # assert_equal 0, Sidekiq::Queues["default"].size
+ # HardWorker.perform_async(:something)
+ # Sidekiq::Queues.clear_all
+ # assert_equal 0, Sidekiq::Queues["default"].size
+ #
+ # This can be useful to make sure jobs don't linger between tests:
+ #
+ # RSpec.configure do |config|
+ # config.before(:each) do
+ # Sidekiq::Queues.clear_all
+ # end
+ # end
+ #
+ class << self
+ def [](queue)
+ jobs_by_queue[queue]
+ end
+
+ def push(queue, klass, job)
+ jobs_by_queue[queue] << job
+ jobs_by_worker[klass] << job
+ end
+
+ def jobs_by_queue
+ @jobs_by_queue ||= Hash.new { |hash, key| hash[key] = [] }
+ end
+
+ def jobs_by_worker
+ @jobs_by_worker ||= Hash.new { |hash, key| hash[key] = [] }
+ end
+
+ def delete_for(jid, queue, klass)
+ jobs_by_queue[queue].delete_if { |job| job["jid"] == jid }
+ jobs_by_worker[klass].delete_if { |job| job["jid"] == jid }
+ end
+
+ def clear_for(queue, klass)
+ jobs_by_queue[queue].clear
+ jobs_by_worker[klass].clear
+ end
+
+ def clear_all
+ jobs_by_queue.clear
+ jobs_by_worker.clear
+ end
+ end
+ end
+
module Worker
##
# The Sidekiq testing infrastructure overrides perform_async
# so that it does not actually touch the network. Instead it
# stores the asynchronous jobs in a per-class array so that
@@ -141,32 +242,40 @@
# When I sign up as "foo@example.com"
# Then I should receive a welcome email to "foo@example.com"
#
module ClassMethods
+ # Queue for this worker
+ def queue
+ self.sidekiq_options["queue"]
+ end
+
# Jobs queued for this worker
def jobs
- Worker.jobs[self]
+ Queues.jobs_by_worker[self.to_s]
end
# Clear all jobs for this worker
def clear
- jobs.clear
+ Queues.clear_for(queue, self.to_s)
end
# Drain and run all jobs for this worker
def drain
- while job = jobs.shift do
- process_job(job)
+ while jobs.any?
+ next_job = jobs.first
+ Queues.delete_for(next_job["jid"], queue, self.to_s)
+ process_job(next_job)
end
end
# Pop out a single job and perform it
def perform_one
raise(EmptyQueueError, "perform_one called with empty job queue") if jobs.empty?
- job = jobs.shift
- process_job(job)
+ next_job = jobs.first
+ Queues.delete_for(next_job["jid"], queue, self.to_s)
+ process_job(next_job)
end
def process_job(job)
worker = new
worker.jid = job['jid']
@@ -181,21 +290,25 @@
end
end
class << self
def jobs # :nodoc:
- @jobs ||= Hash.new { |hash, key| hash[key] = [] }
+ Queues.jobs_by_queue.values.flatten
end
# Clear all queued jobs across all workers
def clear_all
- jobs.clear
+ Queues.clear_all
end
# Drain all queued jobs across all workers
def drain_all
- until jobs.values.all?(&:empty?) do
- jobs.keys.each(&:drain)
+ while jobs.any?
+ worker_classes = jobs.map { |job| job["class"] }.uniq
+
+ worker_classes.each do |worker_class|
+ worker_class.constantize.drain
+ end
end
end
end
end
end