test/unit/daemon_tests.rb in qs-0.5.0 vs test/unit/daemon_tests.rb in qs-0.6.0
- old
+ new
@@ -1,14 +1,16 @@
require 'assert'
require 'qs/daemon'
require 'dat-worker-pool/worker_pool_spy'
+require 'much-plugin'
require 'ns-options/assert_macros'
require 'thread'
require 'qs/client'
require 'qs/queue'
require 'qs/queue_item'
+require 'test/support/client_spy'
module Qs::Daemon
class UnitTests < Assert::Context
desc "Qs::Daemon"
@@ -17,17 +19,20 @@
end
subject{ @daemon_class }
should have_imeths :configuration
should have_imeths :name, :pid_file
- should have_imeths :min_workers, :max_workers, :workers
- should have_imeths :on_worker_start, :on_worker_shutdown
- should have_imeths :on_worker_sleep, :on_worker_wakeup
+ should have_imeths :worker_class, :worker_params
+ should have_imeths :num_workers, :workers
should have_imeths :verbose_logging, :logger
should have_imeths :shutdown_timeout
should have_imeths :init, :error, :queue
+ should "use much-plugin" do
+ assert_includes MuchPlugin, Qs::Daemon
+ end
+
should "know its configuration" do
config = subject.configuration
assert_instance_of Configuration, config
assert_same config, subject.configuration
end
@@ -45,49 +50,38 @@
expected = Pathname.new(new_pid_file)
assert_equal expected, subject.configuration.pid_file
assert_equal expected, subject.pid_file
end
- should "allow reading/writing its configuration min workers" do
- new_min_workers = Factory.integer
- subject.min_workers(new_min_workers)
- assert_equal new_min_workers, subject.configuration.min_workers
- assert_equal new_min_workers, subject.min_workers
+ should "allow reading/writing its configuration worker class" do
+ new_worker_class = Class.new
+ subject.worker_class(new_worker_class)
+ assert_equal new_worker_class, subject.configuration.worker_class
+ assert_equal new_worker_class, subject.worker_class
end
- should "allow reading/writing its configuration max workers" do
- new_max_workers = Factory.integer
- subject.max_workers(new_max_workers)
- assert_equal new_max_workers, subject.configuration.max_workers
- assert_equal new_max_workers, subject.max_workers
+ should "allow reading/writing its configuration worker params" do
+ new_worker_params = { Factory.string => Factory.string }
+ subject.worker_params(new_worker_params)
+ assert_equal new_worker_params, subject.configuration.worker_params
+ assert_equal new_worker_params, subject.worker_params
end
- should "allow reading/writing its configuration workers" do
+ should "allow reading/writing its configuration num workers" do
+ new_num_workers = Factory.integer
+ subject.num_workers(new_num_workers)
+ assert_equal new_num_workers, subject.configuration.num_workers
+ assert_equal new_num_workers, subject.num_workers
+ end
+
+ should "alias workers as num workers" do
new_workers = Factory.integer
subject.workers(new_workers)
- assert_equal new_workers, subject.configuration.min_workers
- assert_equal new_workers, subject.configuration.max_workers
- assert_equal new_workers, subject.min_workers
- assert_equal new_workers, subject.max_workers
+ assert_equal new_workers, subject.configuration.num_workers
+ assert_equal new_workers, subject.workers
end
- should "allow reading/writing its configuration worker procs" do
- p = proc{}
-
- subject.on_worker_start(&p)
- assert_equal [p], subject.configuration.worker_start_procs
-
- subject.on_worker_shutdown(&p)
- assert_equal [p], subject.configuration.worker_shutdown_procs
-
- subject.on_worker_sleep(&p)
- assert_equal [p], subject.configuration.worker_sleep_procs
-
- subject.on_worker_wakeup(&p)
- assert_equal [p], subject.configuration.worker_wakeup_procs
- end
-
should "allow reading/writing its configuration verbose logging" do
new_verbose = Factory.boolean
subject.verbose_logging(new_verbose)
assert_equal new_verbose, subject.configuration.verbose_logging
assert_equal new_verbose, subject.verbose_logging
@@ -132,24 +126,16 @@
@qs_init_called = false
Assert.stub(Qs, :init){ @qs_init_called = true }
@daemon_class.name Factory.string
@daemon_class.pid_file Factory.file_path
+ @daemon_class.worker_params(Factory.string => Factory.string)
@daemon_class.workers Factory.integer
@daemon_class.verbose_logging Factory.boolean
@daemon_class.shutdown_timeout Factory.integer
@daemon_class.error{ Factory.string }
- @start_procs = Factory.integer(3).times.map{ proc{} }
- @shutdown_procs = Factory.integer(3).times.map{ proc{} }
- @sleep_procs = Factory.integer(3).times.map{ proc{} }
- @wakeup_procs = Factory.integer(3).times.map{ proc{} }
- @start_procs.each { |p| @daemon_class.on_worker_start(&p) }
- @shutdown_procs.each { |p| @daemon_class.on_worker_shutdown(&p) }
- @sleep_procs.each { |p| @daemon_class.on_worker_sleep(&p) }
- @wakeup_procs.each { |p| @daemon_class.on_worker_wakeup(&p) }
-
@queue = Qs::Queue.new do
name(Factory.string)
job 'test', TestHandler.to_s
end
@daemon_class.queue @queue
@@ -157,16 +143,19 @@
@client_spy = nil
Assert.stub(Qs::QsClient, :new) do |*args|
@client_spy = ClientSpy.new(*args)
end
- @worker_pool_spy = nil
- @worker_available = true
- Assert.stub(::DatWorkerPool, :new) do |*args, &block|
- @worker_pool_spy = DatWorkerPool::WorkerPoolSpy.new(*args, &block)
- @worker_pool_spy.worker_available = !!@worker_available
- @worker_pool_spy
+ @worker_available = WorkerAvailable.new
+ Assert.stub(WorkerAvailable, :new){ @worker_available }
+
+ @wp_spy = nil
+ @wp_worker_available = true
+ Assert.stub(DatWorkerPool, :new) do |*args|
+ @wp_spy = DatWorkerPool::WorkerPoolSpy.new(*args)
+ @wp_spy.worker_available = !!@wp_worker_available
+ @wp_spy
end
end
teardown do
@daemon.halt(true) rescue false
end
@@ -174,18 +163,12 @@
end
class InitTests < InitSetupTests
desc "when init"
setup do
- @current_env_process_label = ENV['QS_PROCESS_LABEL']
- ENV['QS_PROCESS_LABEL'] = Factory.string
-
@daemon = @daemon_class.new
end
- teardown do
- ENV['QS_PROCESS_LABEL'] = @current_env_process_label
- end
subject{ @daemon }
should have_readers :daemon_data, :logger
should have_readers :signals_redis_key, :queue_redis_keys
should have_imeths :name, :process_label, :pid_file
@@ -204,20 +187,15 @@
configuration = @daemon_class.configuration
data = subject.daemon_data
assert_instance_of Qs::DaemonData, data
assert_equal configuration.name, data.name
- assert_equal configuration.process_label, data.process_label
assert_equal configuration.pid_file, data.pid_file
- assert_equal configuration.min_workers, data.min_workers
- assert_equal configuration.max_workers, data.max_workers
+ assert_equal configuration.worker_class, data.worker_class
+ assert_equal configuration.worker_params, data.worker_params
+ assert_equal configuration.num_workers, data.num_workers
- assert_equal configuration.worker_start_procs, data.worker_start_procs
- assert_equal configuration.worker_shutdown_procs, data.worker_shutdown_procs
- assert_equal configuration.worker_sleep_procs, data.worker_sleep_procs
- assert_equal configuration.worker_wakeup_procs, data.worker_wakeup_procs
-
assert_equal configuration.verbose_logging, data.verbose_logging
assert_equal configuration.shutdown_timeout, data.shutdown_timeout
assert_equal configuration.error_procs, data.error_procs
assert_equal [@queue.redis_key], data.queue_redis_keys
@@ -242,15 +220,32 @@
should "build a client" do
assert_not_nil @client_spy
exp = Qs.redis_config.merge({
:timeout => 1,
- :size => subject.daemon_data.max_workers + 1
+ :size => subject.daemon_data.num_workers + 1
})
assert_equal exp, @client_spy.redis_config
end
+ should "build a worker pool" do
+ data = subject.daemon_data
+
+ assert_not_nil @wp_spy
+ assert_equal data.worker_class, @wp_spy.worker_class
+ assert_equal data.dwp_logger, @wp_spy.logger
+ assert_equal data.num_workers, @wp_spy.num_workers
+ exp = data.worker_params.merge({
+ :qs_daemon_data => data,
+ :qs_client => @client_spy,
+ :qs_worker_available => @worker_available,
+ :qs_logger => data.logger
+ })
+ assert_equal exp, @wp_spy.worker_params
+ assert_false @wp_spy.start_called
+ end
+
should "not be running by default" do
assert_false subject.running?
end
end
@@ -280,39 +275,20 @@
call = @client_spy.calls.find{ |c| c.command == :clear }
assert_not_nil call
assert_equal [subject.signals_redis_key], call.args
end
- should "build and start a worker pool" do
- assert_not_nil @worker_pool_spy
- assert_equal @daemon_class.min_workers, @worker_pool_spy.min_workers
- assert_equal @daemon_class.max_workers, @worker_pool_spy.max_workers
-
- exp = 1
- assert_equal exp, @worker_pool_spy.on_worker_error_callbacks.size
-
- exp = @start_procs.size
- assert_equal exp, @worker_pool_spy.on_worker_start_callbacks.size
-
- exp = @shutdown_procs.size
- assert_equal exp, @worker_pool_spy.on_worker_shutdown_callbacks.size
-
- exp = @sleep_procs.size + 1 # configured plus 1 internal
- assert_equal exp, @worker_pool_spy.on_worker_sleep_callbacks.size
-
- exp = @wakeup_procs.size
- assert_equal exp, @worker_pool_spy.on_worker_wakeup_callbacks.size
-
- assert_true @worker_pool_spy.start_called
+ should "start its worker pool" do
+ assert_true @wp_spy.start_called
end
end
class RunningWithoutAvailableWorkerTests < InitSetupTests
desc "running without an available worker"
setup do
- @worker_available = false
+ @wp_worker_available = false
@daemon = @daemon_class.new
@thread = @daemon.start
end
subject{ @daemon }
@@ -320,11 +296,11 @@
should "sleep its thread and not add work to its worker pool" do
@thread.join(0.1)
assert_equal 'sleep', @thread.status
@client_spy.append(@queue.redis_key, Factory.string)
@thread.join(0.1)
- assert_empty @worker_pool_spy.work_items
+ assert_empty @wp_spy.work_items
end
end
class RunningWithWorkerAndWorkTests < InitSetupTests
@@ -342,11 +318,11 @@
call = @client_spy.calls.last
assert_equal :block_dequeue, call.command
exp = [subject.signals_redis_key, subject.queue_redis_keys, 0].flatten
assert_equal exp, call.args
exp = Qs::QueueItem.new(@queue.redis_key, @encoded_payload)
- assert_equal exp, @worker_pool_spy.work_items.first
+ assert_equal exp, @wp_spy.work_items.first
end
end
class RunningWithErrorWhileDequeuingTests < InitSetupTests
@@ -398,79 +374,22 @@
assert_equal exp, call.args
end
end
- class WorkerPoolWorkProcTests < InitSetupTests
- desc "worker pool work proc"
- setup do
- @ph_spy = nil
- Assert.stub(Qs::PayloadHandler, :new) do |*args|
- @ph_spy = PayloadHandlerSpy.new(*args)
- end
-
- @daemon = @daemon_class.new
- @thread = @daemon.start
-
- @queue_item = Qs::QueueItem.new(Factory.string, Factory.string)
- @worker_pool_spy.work_proc.call(@queue_item)
- end
- subject{ @daemon }
-
- should "build and run a payload handler" do
- assert_not_nil @ph_spy
- assert_equal subject.daemon_data, @ph_spy.daemon_data
- assert_equal @queue_item, @ph_spy.queue_item
- end
-
- end
-
- class WorkerPoolOnWorkerErrorTests < InitSetupTests
- desc "worker pool on worker error proc"
- setup do
- @daemon = @daemon_class.new
- @thread = @daemon.start
-
- @exception = Factory.exception
- @queue_item = Qs::QueueItem.new(Factory.string, Factory.string)
- @callback = @worker_pool_spy.on_worker_error_callbacks.first
- end
- subject{ @daemon }
-
- should "requeue the queue item if it wasn't started" do
- @queue_item.started = false
- @callback.call('worker', @exception, @queue_item)
- call = @client_spy.calls.detect{ |c| c.command == :prepend }
- assert_not_nil call
- assert_equal @queue_item.queue_redis_key, call.args.first
- assert_equal @queue_item.encoded_payload, call.args.last
- end
-
- should "not requeue the queue item if it was started" do
- @queue_item.started = true
- @callback.call('worker', @exception, @queue_item)
- assert_nil @client_spy.calls.detect{ |c| c.command == :prepend }
- end
-
- should "do nothing if not passed a queue item" do
- assert_nothing_raised{ @callback.call(@exception, nil) }
- end
-
- end
-
class StopTests < StartTests
desc "and then stopped"
setup do
@queue_item = Qs::QueueItem.new(@queue.redis_key, Factory.string)
- @worker_pool_spy.add_work(@queue_item)
+ @wp_spy.push(@queue_item)
@daemon.stop true
end
should "shutdown the worker pool" do
- assert_true @worker_pool_spy.shutdown_called
- assert_equal @daemon_class.shutdown_timeout, @worker_pool_spy.shutdown_timeout
+ assert_true @wp_spy.shutdown_called
+ assert_equal @daemon_class.shutdown_timeout, @wp_spy.shutdown_timeout
end
should "requeue any work left on the pool" do
call = @client_spy.calls.last
assert_equal :prepend, call.command
@@ -489,11 +408,11 @@
end
class StopWhileWaitingForWorkerTests < InitSetupTests
desc "stopped while waiting for a worker"
setup do
- @worker_available = false
+ @wp_worker_available = false
@daemon = @daemon_class.new
@thread = @daemon.start
@daemon.stop(true)
end
subject{ @daemon }
@@ -506,18 +425,18 @@
class HaltTests < StartTests
desc "and then halted"
setup do
@queue_item = Qs::QueueItem.new(@queue.redis_key, Factory.string)
- @worker_pool_spy.add_work(@queue_item)
+ @wp_spy.push(@queue_item)
@daemon.halt true
end
should "shutdown the worker pool with a 0 timeout" do
- assert_true @worker_pool_spy.shutdown_called
- assert_equal 0, @worker_pool_spy.shutdown_timeout
+ assert_true @wp_spy.shutdown_called
+ assert_equal 0, @wp_spy.shutdown_timeout
end
should "requeue any work left on the pool" do
call = @client_spy.calls.last
assert_equal :prepend, call.command
@@ -536,11 +455,11 @@
end
class HaltWhileWaitingForWorkerTests < InitSetupTests
desc "halted while waiting for a worker"
setup do
- @worker_available = false
+ @wp_worker_available = false
@daemon = @daemon_class.new
@thread = @daemon.start
@daemon.halt(true)
end
subject{ @daemon }
@@ -553,21 +472,22 @@
class WorkLoopErrorTests < StartTests
desc "with a work loop error"
setup do
# cause a non-dequeue error
- Assert.stub(@worker_pool_spy, :worker_available?){ raise RuntimeError }
+ Assert.stub(@wp_spy, :worker_available?){ raise RuntimeError }
- # cause the daemon to loop, its sleeping on the original block_dequeue
+ # cause the daemon to loop, it's sleeping on the original `block_dequeue`
# call that happened before the stub
@queue_item = Qs::QueueItem.new(@queue.redis_key, Factory.string)
@client_spy.append(@queue_item.queue_redis_key, @queue_item.encoded_payload)
+ @thread.join
end
should "shutdown the worker pool" do
- assert_true @worker_pool_spy.shutdown_called
- assert_equal @daemon_class.shutdown_timeout, @worker_pool_spy.shutdown_timeout
+ assert_true @wp_spy.shutdown_called
+ assert_equal @daemon_class.shutdown_timeout, @wp_spy.shutdown_timeout
end
should "requeue any work left on the pool" do
call = @client_spy.calls.last
assert_equal :prepend, call.command
@@ -602,18 +522,16 @@
end
end
subject{ @configuration }
should have_options :name, :pid_file
- should have_options :min_workers, :max_workers
+ should have_options :num_workers
should have_options :verbose_logging, :logger
should have_options :shutdown_timeout
- should have_accessors :process_label
should have_accessors :init_procs, :error_procs
+ should have_accessors :worker_class, :worker_params
should have_accessors :queues
- should have_readers :worker_start_procs, :worker_shutdown_procs
- should have_readers :worker_sleep_procs, :worker_wakeup_procs
should have_imeths :routes
should have_imeths :to_hash
should have_imeths :valid?, :validate!
should "be an ns-options proxy" do
@@ -622,45 +540,23 @@
should "default its options and attrs" do
config = Configuration.new
assert_nil config.name
assert_nil config.pid_file
- assert_equal 1, config.min_workers
- assert_equal 4, config.max_workers
+ assert_equal 4, config.num_workers
assert_true config.verbose_logging
assert_instance_of Qs::NullLogger, config.logger
assert_nil subject.shutdown_timeout
- assert_nil config.process_label
assert_equal [], config.init_procs
assert_equal [], config.error_procs
- assert_equal [], subject.worker_start_procs
- assert_equal [], subject.worker_shutdown_procs
- assert_equal [], subject.worker_sleep_procs
- assert_equal [], subject.worker_wakeup_procs
+ assert_equal DefaultWorker, config.worker_class
+ assert_nil config.worker_params
assert_equal [], config.queues
assert_equal [], config.routes
end
- should "prefer an env var for the label but fall back to the name option" do
- current_env_process_label = ENV['QS_PROCESS_LABEL']
-
- ENV['QS_PROCESS_LABEL'] = Factory.string
- config = Configuration.new(:name => Factory.string)
- assert_equal ENV['QS_PROCESS_LABEL'], config.process_label
-
- ENV['QS_PROCESS_LABEL'] = ''
- config = Configuration.new(:name => Factory.string)
- assert_equal config.name, config.process_label
-
- ENV.delete('QS_PROCESS_LABEL')
- config = Configuration.new(:name => Factory.string)
- assert_equal config.name, config.process_label
-
- ENV['QS_PROCESS_LABEL'] = current_env_process_label
- end
-
should "not be valid by default" do
assert_false subject.valid?
end
should "know its routes" do
@@ -668,21 +564,17 @@
end
should "include some attrs (not just the options) in its hash" do
config_hash = subject.to_hash
- assert_equal subject.process_label, config_hash[:process_label]
assert_equal subject.error_procs, config_hash[:error_procs]
+ assert_equal subject.worker_class, config_hash[:worker_class]
+ assert_equal subject.worker_params, config_hash[:worker_params]
assert_equal subject.routes, config_hash[:routes]
exp = subject.queues.map(&:redis_key)
assert_equal exp, config_hash[:queue_redis_keys]
-
- assert_equal subject.worker_start_procs, config_hash[:worker_start_procs]
- assert_equal subject.worker_shutdown_procs, config_hash[:worker_shutdown_procs]
- assert_equal subject.worker_sleep_procs, config_hash[:worker_sleep_procs]
- assert_equal subject.worker_wakeup_procs, config_hash[:worker_wakeup_procs]
end
should "call its init procs when validated" do
called = false
subject.init_procs << proc{ called = true }
@@ -706,10 +598,18 @@
subject.routes.each{ |route| assert_nil route.handler_class }
subject.validate!
subject.routes.each{ |route| assert_not_nil route.handler_class }
end
+ should "validate its worker class when validated" do
+ subject.worker_class = Module.new
+ assert_raises(InvalidError){ subject.validate! }
+
+ subject.worker_class = Class.new
+ assert_raises(InvalidError){ subject.validate! }
+ end
+
should "be valid after being validated" do
assert_false subject.valid?
subject.validate!
assert_true subject.valid?
end
@@ -723,96 +623,59 @@
assert_equal 1, called
end
end
- class SignalTests < UnitTests
- desc "Signal"
+ class StateTests < UnitTests
+ desc "State"
setup do
- @signal = Signal.new(:stop)
+ @state = State.new
end
- subject{ @signal }
+ subject{ @state }
- should have_imeths :set, :start?, :stop?, :halt?
+ should have_imeths :run?, :stop?, :halt?
- should "allow setting it to start" do
- subject.set :start
- assert_true subject.start?
- assert_false subject.stop?
- assert_false subject.halt?
+ should "be a dat-worker-pool locked object" do
+ assert State < DatWorkerPool::LockedObject
end
- should "allow setting it to stop" do
+ should "know if its in the run state" do
+ assert_false subject.run?
+ subject.set :run
+ assert_true subject.run?
+ end
+
+ should "know if its in the stop state" do
+ assert_false subject.stop?
subject.set :stop
- assert_false subject.start?
assert_true subject.stop?
- assert_false subject.halt?
end
- should "allow setting it to halt" do
+ should "know if its in the halt state" do
+ assert_false subject.halt?
subject.set :halt
- assert_false subject.start?
- assert_false subject.stop?
assert_true subject.halt?
end
end
- TestHandler = Class.new
-
- class PayloadHandlerSpy
- attr_reader :daemon_data, :queue_item, :run_called
-
- def initialize(daemon_data, queue_item)
- @daemon_data = daemon_data
- @queue_item = queue_item
- @run_called = false
+ class WorkerAvailableTests < UnitTests
+ desc "WorkerAvailable"
+ setup do
+ @worker_available = WorkerAvailable.new
end
+ subject{ @worker_available }
- def run
- @run_called = true
- end
- end
+ should have_imeths :wait, :signal
- class ClientSpy < Qs::TestClient
- attr_reader :calls
-
- def initialize(*args)
- super
- @calls = []
- @list = []
- @mutex = Mutex.new
- @cv = ConditionVariable.new
+ should "allow waiting and signalling" do
+ thread = Thread.new{ subject.wait }
+ assert_equal 'sleep', thread.status
+ subject.signal
+ assert_equal false, thread.status # dead, done running
end
- def block_dequeue(*args)
- @calls << Call.new(:block_dequeue, args)
- if @list.empty?
- @mutex.synchronize{ @cv.wait(@mutex) }
- end
- @list.shift
- end
-
- def append(*args)
- @calls << Call.new(:append, args)
- @list << args
- @cv.signal
- end
-
- def prepend(*args)
- @calls << Call.new(:prepend, args)
- @list << args
- @cv.signal
- end
-
- def clear(*args)
- @calls << Call.new(:clear, args)
- end
-
- def ping
- @calls << Call.new(:ping)
- end
-
- Call = Struct.new(:command, :args)
end
+
+ TestHandler = Class.new
end