test/unit/daemon_tests.rb in qs-0.5.0 vs test/unit/daemon_tests.rb in qs-0.6.0

- old
+ new

@@ -1,14 +1,16 @@ require 'assert' require 'qs/daemon' require 'dat-worker-pool/worker_pool_spy' +require 'much-plugin' require 'ns-options/assert_macros' require 'thread' require 'qs/client' require 'qs/queue' require 'qs/queue_item' +require 'test/support/client_spy' module Qs::Daemon class UnitTests < Assert::Context desc "Qs::Daemon" @@ -17,17 +19,20 @@ end subject{ @daemon_class } should have_imeths :configuration should have_imeths :name, :pid_file - should have_imeths :min_workers, :max_workers, :workers - should have_imeths :on_worker_start, :on_worker_shutdown - should have_imeths :on_worker_sleep, :on_worker_wakeup + should have_imeths :worker_class, :worker_params + should have_imeths :num_workers, :workers should have_imeths :verbose_logging, :logger should have_imeths :shutdown_timeout should have_imeths :init, :error, :queue + should "use much-plugin" do + assert_includes MuchPlugin, Qs::Daemon + end + should "know its configuration" do config = subject.configuration assert_instance_of Configuration, config assert_same config, subject.configuration end @@ -45,49 +50,38 @@ expected = Pathname.new(new_pid_file) assert_equal expected, subject.configuration.pid_file assert_equal expected, subject.pid_file end - should "allow reading/writing its configuration min workers" do - new_min_workers = Factory.integer - subject.min_workers(new_min_workers) - assert_equal new_min_workers, subject.configuration.min_workers - assert_equal new_min_workers, subject.min_workers + should "allow reading/writing its configuration worker class" do + new_worker_class = Class.new + subject.worker_class(new_worker_class) + assert_equal new_worker_class, subject.configuration.worker_class + assert_equal new_worker_class, subject.worker_class end - should "allow reading/writing its configuration max workers" do - new_max_workers = Factory.integer - subject.max_workers(new_max_workers) - assert_equal new_max_workers, subject.configuration.max_workers - assert_equal new_max_workers, subject.max_workers + should "allow reading/writing its configuration worker params" do + new_worker_params = { Factory.string => Factory.string } + subject.worker_params(new_worker_params) + assert_equal new_worker_params, subject.configuration.worker_params + assert_equal new_worker_params, subject.worker_params end - should "allow reading/writing its configuration workers" do + should "allow reading/writing its configuration num workers" do + new_num_workers = Factory.integer + subject.num_workers(new_num_workers) + assert_equal new_num_workers, subject.configuration.num_workers + assert_equal new_num_workers, subject.num_workers + end + + should "alias workers as num workers" do new_workers = Factory.integer subject.workers(new_workers) - assert_equal new_workers, subject.configuration.min_workers - assert_equal new_workers, subject.configuration.max_workers - assert_equal new_workers, subject.min_workers - assert_equal new_workers, subject.max_workers + assert_equal new_workers, subject.configuration.num_workers + assert_equal new_workers, subject.workers end - should "allow reading/writing its configuration worker procs" do - p = proc{} - - subject.on_worker_start(&p) - assert_equal [p], subject.configuration.worker_start_procs - - subject.on_worker_shutdown(&p) - assert_equal [p], subject.configuration.worker_shutdown_procs - - subject.on_worker_sleep(&p) - assert_equal [p], subject.configuration.worker_sleep_procs - - subject.on_worker_wakeup(&p) - assert_equal [p], subject.configuration.worker_wakeup_procs - end - should "allow reading/writing its configuration verbose logging" do new_verbose = Factory.boolean subject.verbose_logging(new_verbose) assert_equal new_verbose, subject.configuration.verbose_logging assert_equal new_verbose, subject.verbose_logging @@ -132,24 +126,16 @@ @qs_init_called = false Assert.stub(Qs, :init){ @qs_init_called = true } @daemon_class.name Factory.string @daemon_class.pid_file Factory.file_path + @daemon_class.worker_params(Factory.string => Factory.string) @daemon_class.workers Factory.integer @daemon_class.verbose_logging Factory.boolean @daemon_class.shutdown_timeout Factory.integer @daemon_class.error{ Factory.string } - @start_procs = Factory.integer(3).times.map{ proc{} } - @shutdown_procs = Factory.integer(3).times.map{ proc{} } - @sleep_procs = Factory.integer(3).times.map{ proc{} } - @wakeup_procs = Factory.integer(3).times.map{ proc{} } - @start_procs.each { |p| @daemon_class.on_worker_start(&p) } - @shutdown_procs.each { |p| @daemon_class.on_worker_shutdown(&p) } - @sleep_procs.each { |p| @daemon_class.on_worker_sleep(&p) } - @wakeup_procs.each { |p| @daemon_class.on_worker_wakeup(&p) } - @queue = Qs::Queue.new do name(Factory.string) job 'test', TestHandler.to_s end @daemon_class.queue @queue @@ -157,16 +143,19 @@ @client_spy = nil Assert.stub(Qs::QsClient, :new) do |*args| @client_spy = ClientSpy.new(*args) end - @worker_pool_spy = nil - @worker_available = true - Assert.stub(::DatWorkerPool, :new) do |*args, &block| - @worker_pool_spy = DatWorkerPool::WorkerPoolSpy.new(*args, &block) - @worker_pool_spy.worker_available = !!@worker_available - @worker_pool_spy + @worker_available = WorkerAvailable.new + Assert.stub(WorkerAvailable, :new){ @worker_available } + + @wp_spy = nil + @wp_worker_available = true + Assert.stub(DatWorkerPool, :new) do |*args| + @wp_spy = DatWorkerPool::WorkerPoolSpy.new(*args) + @wp_spy.worker_available = !!@wp_worker_available + @wp_spy end end teardown do @daemon.halt(true) rescue false end @@ -174,18 +163,12 @@ end class InitTests < InitSetupTests desc "when init" setup do - @current_env_process_label = ENV['QS_PROCESS_LABEL'] - ENV['QS_PROCESS_LABEL'] = Factory.string - @daemon = @daemon_class.new end - teardown do - ENV['QS_PROCESS_LABEL'] = @current_env_process_label - end subject{ @daemon } should have_readers :daemon_data, :logger should have_readers :signals_redis_key, :queue_redis_keys should have_imeths :name, :process_label, :pid_file @@ -204,20 +187,15 @@ configuration = @daemon_class.configuration data = subject.daemon_data assert_instance_of Qs::DaemonData, data assert_equal configuration.name, data.name - assert_equal configuration.process_label, data.process_label assert_equal configuration.pid_file, data.pid_file - assert_equal configuration.min_workers, data.min_workers - assert_equal configuration.max_workers, data.max_workers + assert_equal configuration.worker_class, data.worker_class + assert_equal configuration.worker_params, data.worker_params + assert_equal configuration.num_workers, data.num_workers - assert_equal configuration.worker_start_procs, data.worker_start_procs - assert_equal configuration.worker_shutdown_procs, data.worker_shutdown_procs - assert_equal configuration.worker_sleep_procs, data.worker_sleep_procs - assert_equal configuration.worker_wakeup_procs, data.worker_wakeup_procs - assert_equal configuration.verbose_logging, data.verbose_logging assert_equal configuration.shutdown_timeout, data.shutdown_timeout assert_equal configuration.error_procs, data.error_procs assert_equal [@queue.redis_key], data.queue_redis_keys @@ -242,15 +220,32 @@ should "build a client" do assert_not_nil @client_spy exp = Qs.redis_config.merge({ :timeout => 1, - :size => subject.daemon_data.max_workers + 1 + :size => subject.daemon_data.num_workers + 1 }) assert_equal exp, @client_spy.redis_config end + should "build a worker pool" do + data = subject.daemon_data + + assert_not_nil @wp_spy + assert_equal data.worker_class, @wp_spy.worker_class + assert_equal data.dwp_logger, @wp_spy.logger + assert_equal data.num_workers, @wp_spy.num_workers + exp = data.worker_params.merge({ + :qs_daemon_data => data, + :qs_client => @client_spy, + :qs_worker_available => @worker_available, + :qs_logger => data.logger + }) + assert_equal exp, @wp_spy.worker_params + assert_false @wp_spy.start_called + end + should "not be running by default" do assert_false subject.running? end end @@ -280,39 +275,20 @@ call = @client_spy.calls.find{ |c| c.command == :clear } assert_not_nil call assert_equal [subject.signals_redis_key], call.args end - should "build and start a worker pool" do - assert_not_nil @worker_pool_spy - assert_equal @daemon_class.min_workers, @worker_pool_spy.min_workers - assert_equal @daemon_class.max_workers, @worker_pool_spy.max_workers - - exp = 1 - assert_equal exp, @worker_pool_spy.on_worker_error_callbacks.size - - exp = @start_procs.size - assert_equal exp, @worker_pool_spy.on_worker_start_callbacks.size - - exp = @shutdown_procs.size - assert_equal exp, @worker_pool_spy.on_worker_shutdown_callbacks.size - - exp = @sleep_procs.size + 1 # configured plus 1 internal - assert_equal exp, @worker_pool_spy.on_worker_sleep_callbacks.size - - exp = @wakeup_procs.size - assert_equal exp, @worker_pool_spy.on_worker_wakeup_callbacks.size - - assert_true @worker_pool_spy.start_called + should "start its worker pool" do + assert_true @wp_spy.start_called end end class RunningWithoutAvailableWorkerTests < InitSetupTests desc "running without an available worker" setup do - @worker_available = false + @wp_worker_available = false @daemon = @daemon_class.new @thread = @daemon.start end subject{ @daemon } @@ -320,11 +296,11 @@ should "sleep its thread and not add work to its worker pool" do @thread.join(0.1) assert_equal 'sleep', @thread.status @client_spy.append(@queue.redis_key, Factory.string) @thread.join(0.1) - assert_empty @worker_pool_spy.work_items + assert_empty @wp_spy.work_items end end class RunningWithWorkerAndWorkTests < InitSetupTests @@ -342,11 +318,11 @@ call = @client_spy.calls.last assert_equal :block_dequeue, call.command exp = [subject.signals_redis_key, subject.queue_redis_keys, 0].flatten assert_equal exp, call.args exp = Qs::QueueItem.new(@queue.redis_key, @encoded_payload) - assert_equal exp, @worker_pool_spy.work_items.first + assert_equal exp, @wp_spy.work_items.first end end class RunningWithErrorWhileDequeuingTests < InitSetupTests @@ -398,79 +374,22 @@ assert_equal exp, call.args end end - class WorkerPoolWorkProcTests < InitSetupTests - desc "worker pool work proc" - setup do - @ph_spy = nil - Assert.stub(Qs::PayloadHandler, :new) do |*args| - @ph_spy = PayloadHandlerSpy.new(*args) - end - - @daemon = @daemon_class.new - @thread = @daemon.start - - @queue_item = Qs::QueueItem.new(Factory.string, Factory.string) - @worker_pool_spy.work_proc.call(@queue_item) - end - subject{ @daemon } - - should "build and run a payload handler" do - assert_not_nil @ph_spy - assert_equal subject.daemon_data, @ph_spy.daemon_data - assert_equal @queue_item, @ph_spy.queue_item - end - - end - - class WorkerPoolOnWorkerErrorTests < InitSetupTests - desc "worker pool on worker error proc" - setup do - @daemon = @daemon_class.new - @thread = @daemon.start - - @exception = Factory.exception - @queue_item = Qs::QueueItem.new(Factory.string, Factory.string) - @callback = @worker_pool_spy.on_worker_error_callbacks.first - end - subject{ @daemon } - - should "requeue the queue item if it wasn't started" do - @queue_item.started = false - @callback.call('worker', @exception, @queue_item) - call = @client_spy.calls.detect{ |c| c.command == :prepend } - assert_not_nil call - assert_equal @queue_item.queue_redis_key, call.args.first - assert_equal @queue_item.encoded_payload, call.args.last - end - - should "not requeue the queue item if it was started" do - @queue_item.started = true - @callback.call('worker', @exception, @queue_item) - assert_nil @client_spy.calls.detect{ |c| c.command == :prepend } - end - - should "do nothing if not passed a queue item" do - assert_nothing_raised{ @callback.call(@exception, nil) } - end - - end - class StopTests < StartTests desc "and then stopped" setup do @queue_item = Qs::QueueItem.new(@queue.redis_key, Factory.string) - @worker_pool_spy.add_work(@queue_item) + @wp_spy.push(@queue_item) @daemon.stop true end should "shutdown the worker pool" do - assert_true @worker_pool_spy.shutdown_called - assert_equal @daemon_class.shutdown_timeout, @worker_pool_spy.shutdown_timeout + assert_true @wp_spy.shutdown_called + assert_equal @daemon_class.shutdown_timeout, @wp_spy.shutdown_timeout end should "requeue any work left on the pool" do call = @client_spy.calls.last assert_equal :prepend, call.command @@ -489,11 +408,11 @@ end class StopWhileWaitingForWorkerTests < InitSetupTests desc "stopped while waiting for a worker" setup do - @worker_available = false + @wp_worker_available = false @daemon = @daemon_class.new @thread = @daemon.start @daemon.stop(true) end subject{ @daemon } @@ -506,18 +425,18 @@ class HaltTests < StartTests desc "and then halted" setup do @queue_item = Qs::QueueItem.new(@queue.redis_key, Factory.string) - @worker_pool_spy.add_work(@queue_item) + @wp_spy.push(@queue_item) @daemon.halt true end should "shutdown the worker pool with a 0 timeout" do - assert_true @worker_pool_spy.shutdown_called - assert_equal 0, @worker_pool_spy.shutdown_timeout + assert_true @wp_spy.shutdown_called + assert_equal 0, @wp_spy.shutdown_timeout end should "requeue any work left on the pool" do call = @client_spy.calls.last assert_equal :prepend, call.command @@ -536,11 +455,11 @@ end class HaltWhileWaitingForWorkerTests < InitSetupTests desc "halted while waiting for a worker" setup do - @worker_available = false + @wp_worker_available = false @daemon = @daemon_class.new @thread = @daemon.start @daemon.halt(true) end subject{ @daemon } @@ -553,21 +472,22 @@ class WorkLoopErrorTests < StartTests desc "with a work loop error" setup do # cause a non-dequeue error - Assert.stub(@worker_pool_spy, :worker_available?){ raise RuntimeError } + Assert.stub(@wp_spy, :worker_available?){ raise RuntimeError } - # cause the daemon to loop, its sleeping on the original block_dequeue + # cause the daemon to loop, it's sleeping on the original `block_dequeue` # call that happened before the stub @queue_item = Qs::QueueItem.new(@queue.redis_key, Factory.string) @client_spy.append(@queue_item.queue_redis_key, @queue_item.encoded_payload) + @thread.join end should "shutdown the worker pool" do - assert_true @worker_pool_spy.shutdown_called - assert_equal @daemon_class.shutdown_timeout, @worker_pool_spy.shutdown_timeout + assert_true @wp_spy.shutdown_called + assert_equal @daemon_class.shutdown_timeout, @wp_spy.shutdown_timeout end should "requeue any work left on the pool" do call = @client_spy.calls.last assert_equal :prepend, call.command @@ -602,18 +522,16 @@ end end subject{ @configuration } should have_options :name, :pid_file - should have_options :min_workers, :max_workers + should have_options :num_workers should have_options :verbose_logging, :logger should have_options :shutdown_timeout - should have_accessors :process_label should have_accessors :init_procs, :error_procs + should have_accessors :worker_class, :worker_params should have_accessors :queues - should have_readers :worker_start_procs, :worker_shutdown_procs - should have_readers :worker_sleep_procs, :worker_wakeup_procs should have_imeths :routes should have_imeths :to_hash should have_imeths :valid?, :validate! should "be an ns-options proxy" do @@ -622,45 +540,23 @@ should "default its options and attrs" do config = Configuration.new assert_nil config.name assert_nil config.pid_file - assert_equal 1, config.min_workers - assert_equal 4, config.max_workers + assert_equal 4, config.num_workers assert_true config.verbose_logging assert_instance_of Qs::NullLogger, config.logger assert_nil subject.shutdown_timeout - assert_nil config.process_label assert_equal [], config.init_procs assert_equal [], config.error_procs - assert_equal [], subject.worker_start_procs - assert_equal [], subject.worker_shutdown_procs - assert_equal [], subject.worker_sleep_procs - assert_equal [], subject.worker_wakeup_procs + assert_equal DefaultWorker, config.worker_class + assert_nil config.worker_params assert_equal [], config.queues assert_equal [], config.routes end - should "prefer an env var for the label but fall back to the name option" do - current_env_process_label = ENV['QS_PROCESS_LABEL'] - - ENV['QS_PROCESS_LABEL'] = Factory.string - config = Configuration.new(:name => Factory.string) - assert_equal ENV['QS_PROCESS_LABEL'], config.process_label - - ENV['QS_PROCESS_LABEL'] = '' - config = Configuration.new(:name => Factory.string) - assert_equal config.name, config.process_label - - ENV.delete('QS_PROCESS_LABEL') - config = Configuration.new(:name => Factory.string) - assert_equal config.name, config.process_label - - ENV['QS_PROCESS_LABEL'] = current_env_process_label - end - should "not be valid by default" do assert_false subject.valid? end should "know its routes" do @@ -668,21 +564,17 @@ end should "include some attrs (not just the options) in its hash" do config_hash = subject.to_hash - assert_equal subject.process_label, config_hash[:process_label] assert_equal subject.error_procs, config_hash[:error_procs] + assert_equal subject.worker_class, config_hash[:worker_class] + assert_equal subject.worker_params, config_hash[:worker_params] assert_equal subject.routes, config_hash[:routes] exp = subject.queues.map(&:redis_key) assert_equal exp, config_hash[:queue_redis_keys] - - assert_equal subject.worker_start_procs, config_hash[:worker_start_procs] - assert_equal subject.worker_shutdown_procs, config_hash[:worker_shutdown_procs] - assert_equal subject.worker_sleep_procs, config_hash[:worker_sleep_procs] - assert_equal subject.worker_wakeup_procs, config_hash[:worker_wakeup_procs] end should "call its init procs when validated" do called = false subject.init_procs << proc{ called = true } @@ -706,10 +598,18 @@ subject.routes.each{ |route| assert_nil route.handler_class } subject.validate! subject.routes.each{ |route| assert_not_nil route.handler_class } end + should "validate its worker class when validated" do + subject.worker_class = Module.new + assert_raises(InvalidError){ subject.validate! } + + subject.worker_class = Class.new + assert_raises(InvalidError){ subject.validate! } + end + should "be valid after being validated" do assert_false subject.valid? subject.validate! assert_true subject.valid? end @@ -723,96 +623,59 @@ assert_equal 1, called end end - class SignalTests < UnitTests - desc "Signal" + class StateTests < UnitTests + desc "State" setup do - @signal = Signal.new(:stop) + @state = State.new end - subject{ @signal } + subject{ @state } - should have_imeths :set, :start?, :stop?, :halt? + should have_imeths :run?, :stop?, :halt? - should "allow setting it to start" do - subject.set :start - assert_true subject.start? - assert_false subject.stop? - assert_false subject.halt? + should "be a dat-worker-pool locked object" do + assert State < DatWorkerPool::LockedObject end - should "allow setting it to stop" do + should "know if its in the run state" do + assert_false subject.run? + subject.set :run + assert_true subject.run? + end + + should "know if its in the stop state" do + assert_false subject.stop? subject.set :stop - assert_false subject.start? assert_true subject.stop? - assert_false subject.halt? end - should "allow setting it to halt" do + should "know if its in the halt state" do + assert_false subject.halt? subject.set :halt - assert_false subject.start? - assert_false subject.stop? assert_true subject.halt? end end - TestHandler = Class.new - - class PayloadHandlerSpy - attr_reader :daemon_data, :queue_item, :run_called - - def initialize(daemon_data, queue_item) - @daemon_data = daemon_data - @queue_item = queue_item - @run_called = false + class WorkerAvailableTests < UnitTests + desc "WorkerAvailable" + setup do + @worker_available = WorkerAvailable.new end + subject{ @worker_available } - def run - @run_called = true - end - end + should have_imeths :wait, :signal - class ClientSpy < Qs::TestClient - attr_reader :calls - - def initialize(*args) - super - @calls = [] - @list = [] - @mutex = Mutex.new - @cv = ConditionVariable.new + should "allow waiting and signalling" do + thread = Thread.new{ subject.wait } + assert_equal 'sleep', thread.status + subject.signal + assert_equal false, thread.status # dead, done running end - def block_dequeue(*args) - @calls << Call.new(:block_dequeue, args) - if @list.empty? - @mutex.synchronize{ @cv.wait(@mutex) } - end - @list.shift - end - - def append(*args) - @calls << Call.new(:append, args) - @list << args - @cv.signal - end - - def prepend(*args) - @calls << Call.new(:prepend, args) - @list << args - @cv.signal - end - - def clear(*args) - @calls << Call.new(:clear, args) - end - - def ping - @calls << Call.new(:ping) - end - - Call = Struct.new(:command, :args) end + + TestHandler = Class.new end