test/system/daemon_tests.rb in qs-0.6.1 vs test/system/daemon_tests.rb in qs-0.7.0

- old
+ new

@@ -1,27 +1,27 @@ require 'assert' require 'qs/daemon' -require 'test/support/app_daemon' +require 'test/support/app_queue' module Qs::Daemon class SystemTests < Assert::Context desc "Qs::Daemon" setup do Qs.reset! @qs_test_mode = ENV['QS_TEST_MODE'] ENV['QS_TEST_MODE'] = nil - Qs.config.dispatcher.queue_name = 'qs-app-dispatcher' + Qs.config.dispatcher_queue_name = 'qs-app-dispatcher' Qs.config.event_publisher = 'Daemon System Tests' Qs.init AppQueue.sync_subscriptions - @orig_config = AppDaemon.configuration.to_hash + + @app_daemon_class = build_app_daemon_class end teardown do @daemon_runner.stop if @daemon_runner - AppDaemon.configuration.apply(@orig_config) # reset daemon config Qs.redis.with do |c| keys = c.keys('*qs-app*') c.pipelined{ keys.each{ |k| c.del(k) } } end Qs.client.clear(AppQueue.redis_key) @@ -30,14 +30,46 @@ ENV['QS_TEST_MODE'] = @qs_test_mode end private + # manually build new anonymous app daemon classes for each run. We do this + # both to not mess with global state when tweaking config values for tests + # and b/c there is no way to "reset" an existing class's config. + def build_app_daemon_class + Class.new do + include Qs::Daemon + + name 'qs-app' + + logger Logger.new(ROOT_PATH.join('log/app_daemon.log').to_s) + logger.datetime_format = "" # turn off the datetime in the logs + + verbose_logging true + + queue AppQueue + + error do |exception, context| + return unless (message = context.message) + payload_type = message.payload_type + route_name = message.route_name + case(route_name) + when 'error', 'timeout', 'qs-app:error', 'qs-app:timeout' + error = "#{exception.class}: #{exception.message}" + Qs.redis.with{ |c| c.set("qs-app:last_#{payload_type}_error", error) } + when 'slow', 'qs-app:slow' + error = exception.class.to_s + Qs.redis.with{ |c| c.set("qs-app:last_#{payload_type}_error", error) } + end + end + end + end + def setup_app_and_dispatcher_daemon - @app_daemon = AppDaemon.new - @dispatcher_daemon = DispatcherDaemon.new - @daemon_runner = DaemonRunner.new(@app_daemon, @dispatcher_daemon) + @app_daemon = @app_daemon_class.new + @dispatcher_daemon = AppDispatcherDaemon.new + @daemon_runner = AppDaemonRunner.new(@app_daemon, @dispatcher_daemon) @app_thread = @daemon_runner.start end end @@ -54,11 +86,11 @@ @key, @value = [Factory.string, Factory.string] AppQueue.add('basic', { 'key' => @key, 'value' => @value }) - @app_thread.join 0.5 + @app_thread.join(JOIN_SECONDS) end should "run the job" do assert_equal @value, Qs.redis.with{ |c| c.get("qs-app:#{@key}") } end @@ -68,11 +100,11 @@ class JobThatErrorsTests < RunningDaemonSetupTests desc "with a job that errors" setup do @error_message = Factory.text AppQueue.add('error', 'error_message' => @error_message) - @app_thread.join 0.5 + @app_thread.join(JOIN_SECONDS) end should "run the configured error handler procs" do exp = "RuntimeError: #{@error_message}" assert_equal exp, Qs.redis.with{ |c| c.get('qs-app:last_job_error') } @@ -82,11 +114,11 @@ class TimeoutJobTests < RunningDaemonSetupTests desc "with a job that times out" setup do AppQueue.add('timeout') - @app_thread.join 1 # let the daemon have time to process the job + @app_thread.join(AppHandlers::Timeout::TIMEOUT_TIME + JOIN_SECONDS) end should "run the configured error handler procs" do handler_class = AppHandlers::Timeout exp = "Qs::TimeoutError: #{handler_class} timed out " \ @@ -102,11 +134,11 @@ @key, @value = [Factory.string, Factory.string] Qs.publish('qs-app', 'basic', { 'key' => @key, 'value' => @value }) - @app_thread.join 0.5 + @app_thread.join(JOIN_SECONDS) end should "run the event" do assert_equal @value, Qs.redis.with{ |c| c.get("qs-app:#{@key}") } end @@ -116,11 +148,11 @@ class EventThatErrorsTests < RunningDaemonSetupTests desc "with an event that errors" setup do @error_message = Factory.text Qs.publish('qs-app', 'error', 'error_message' => @error_message) - @app_thread.join 0.5 + @app_thread.join(JOIN_SECONDS) end should "run the configured error handler procs" do exp = "RuntimeError: #{@error_message}" assert_equal exp, Qs.redis.with{ |c| c.get('qs-app:last_event_error') } @@ -130,11 +162,11 @@ class TimeoutEventTests < RunningDaemonSetupTests desc "with an event that times out" setup do Qs.publish('qs-app', 'timeout') - @app_thread.join 1 # let the daemon have time to process the job + @app_thread.join(AppHandlers::Timeout::TIMEOUT_TIME + JOIN_SECONDS) end should "run the configured error handler procs" do handler_class = AppHandlers::TimeoutEvent exp = "Qs::TimeoutError: #{handler_class} timed out " \ @@ -145,118 +177,143 @@ end class ShutdownWithoutTimeoutTests < SystemTests desc "without a shutdown timeout" setup do - AppDaemon.shutdown_timeout nil # disable shutdown timeout + @app_daemon_class.shutdown_timeout nil # disable shutdown timeout + @nil_shutdown_timeout = 10 # something absurdly long, it should be faster + # than this but want some timout to keep tests + # from hanging in case it never shuts down + setup_app_and_dispatcher_daemon AppQueue.add('slow') Qs.publish('qs-app', 'slow') - @app_thread.join 1 # let the daemon have time to process the job and event + @app_thread.join(JOIN_SECONDS) end should "shutdown and let the job and event finish" do @app_daemon.stop - @app_thread.join 10 # give it time to shutdown, should be faster + @app_thread.join(@nil_shutdown_timeout) + assert_false @app_thread.alive? assert_equal 'finished', Qs.redis.with{ |c| c.get('qs-app:slow') } assert_equal 'finished', Qs.redis.with{ |c| c.get('qs-app:slow:event') } end should "shutdown and not let the job or event finish" do @app_daemon.halt - @app_thread.join 2 # give it time to shutdown, should be faster + @app_thread.join(@nil_shutdown_timeout) + assert_false @app_thread.alive? assert_nil Qs.redis.with{ |c| c.get('qs-app:slow') } + exp = "Qs::ShutdownError" assert_equal exp, Qs.redis.with{ |c| c.get('qs-app:last_job_error') } assert_nil Qs.redis.with{ |c| c.get('qs-app:slow:event') } + exp = "Qs::ShutdownError" assert_equal exp, Qs.redis.with{ |c| c.get('qs-app:last_event_error') } end end class ShutdownWithTimeoutTests < SystemTests desc "with a shutdown timeout" setup do - AppDaemon.shutdown_timeout 1 + @shutdown_timeout = AppHandlers::Slow::SLOW_TIME * 0.5 + @app_daemon_class.shutdown_timeout @shutdown_timeout setup_app_and_dispatcher_daemon AppQueue.add('slow') Qs.publish('qs-app', 'slow') - @app_thread.join 1 # let the daemon have time to process the job and event + @app_thread.join(JOIN_SECONDS) end should "shutdown and not let the job or event finish" do @app_daemon.stop - @app_thread.join 2 # give it time to shutdown, should be faster + @app_thread.join(@shutdown_timeout + JOIN_SECONDS) + assert_false @app_thread.alive? assert_nil Qs.redis.with{ |c| c.get('qs-app:slow') } + exp = "Qs::ShutdownError" assert_equal exp, Qs.redis.with{ |c| c.get('qs-app:last_job_error') } assert_nil Qs.redis.with{ |c| c.get('qs-app:slow:event') } + exp = "Qs::ShutdownError" assert_equal exp, Qs.redis.with{ |c| c.get('qs-app:last_event_error') } end should "shutdown and not let the job or event finish" do @app_daemon.halt - @app_thread.join 2 # give it time to shutdown, should be faster + @app_thread.join(@shutdown_timeout + JOIN_SECONDS) + assert_false @app_thread.alive? assert_nil Qs.redis.with{ |c| c.get('qs-app:slow') } + exp = "Qs::ShutdownError" assert_equal exp, Qs.redis.with{ |c| c.get('qs-app:last_job_error') } assert_nil Qs.redis.with{ |c| c.get('qs-app:slow:event') } + exp = "Qs::ShutdownError" assert_equal exp, Qs.redis.with{ |c| c.get('qs-app:last_event_error') } end end class ShutdownWithUnprocessedQueueItemTests < SystemTests desc "with a queue item that gets picked up but doesn't get processed" setup do - Assert.stub(Qs::PayloadHandler, :new){ sleep 5 } + Assert.stub(Qs::PayloadHandler, :new) do + sleep AppHandlers::Slow::SLOW_TIME + JOIN_SECONDS + end - AppDaemon.shutdown_timeout 1 - AppDaemon.workers 2 + @shutdown_timeout = AppHandlers::Slow::SLOW_TIME * 0.5 + @app_daemon_class.shutdown_timeout @shutdown_timeout + @app_daemon_class.workers 2 setup_app_and_dispatcher_daemon - AppQueue.add('slow') - AppQueue.add('slow') - AppQueue.add('basic') - @app_thread.join 1 # let the daemon have time to process jobs + AppQueue.add('slow1') + AppQueue.add('slow2') + AppQueue.add('basic1') + + @app_thread.join(JOIN_SECONDS) end should "shutdown and requeue the queue item" do @app_daemon.stop - @app_thread.join 2 # give it time to shutdown, should be faster + @app_thread.join(@shutdown_timeout + JOIN_SECONDS) + assert_false @app_thread.alive? + encoded_payloads = Qs.redis.with{ |c| c.lrange(AppQueue.redis_key, 0, 3) } names = encoded_payloads.map{ |sp| Qs::Payload.deserialize(sp).name } - assert_equal ['basic', 'slow', 'slow'], names + + ['slow1', 'slow2', 'basic1'].each{ |n| assert_includes n, names } end should "shutdown and requeue the queue item" do @app_daemon.halt - @app_thread.join 2 # give it time to shutdown, should be faster + @app_thread.join(@shutdown_timeout + JOIN_SECONDS) + assert_false @app_thread.alive? - encoded_payloads = Qs.redis.with{ |c| c.lrange(AppQueue.redis_key, 0, 3) } + + encoded_payloads = Qs.redis.with{ |c| c.lrange(AppQueue.redis_key, 0, 4) } names = encoded_payloads.map{ |sp| Qs::Payload.deserialize(sp).name } - assert_equal ['basic', 'slow', 'slow'], names + + ['slow1', 'slow2', 'basic1'].each{ |n| assert_includes n, names } end end class WithEnvProcessLabelTests < SystemTests desc "with a process label env var" setup do ENV['QS_PROCESS_LABEL'] = Factory.string - @daemon = AppDaemon.new + @daemon = @app_daemon_class.new end teardown do ENV.delete('QS_PROCESS_LABEL') end subject{ @daemon } @@ -265,20 +322,44 @@ assert_equal ENV['QS_PROCESS_LABEL'], subject.process_label end end - class DaemonRunner + class AppDispatcherDaemon + include Qs::Daemon + + name 'qs-app-dispatcher' + + logger Logger.new(ROOT_PATH.join('log/app_dispatcher_daemon.log').to_s) + logger.datetime_format = "" # turn off the datetime in the logs + + verbose_logging true + + # we build a "custom" dispatcher because we can't rely on Qs being initialized + # when this is required + queue Qs::DispatcherQueue.new({ + :queue_class => Qs.config.dispatcher_queue_class, + :queue_name => 'qs-app-dispatcher', + :job_name => Qs.config.dispatcher_job_name, + :job_handler_class_name => Qs.config.dispatcher_job_handler_class_name + }) + end + + class AppDaemonRunner def initialize(app_daemon, dispatcher_daemon = nil) @app_daemon = app_daemon @dispatcher_daemon = dispatcher_daemon @app_thread = nil @dispatcher_thread = nil end def start @app_thread = @app_daemon.start - @dispatcher_thread = @dispatcher_daemon.start if @dispatcher_daemon + @app_thread.join(JOIN_SECONDS) + if @dispatcher_daemon + @dispatcher_thread = @dispatcher_daemon.start + @dispatcher_thread.join(JOIN_SECONDS) + end @app_thread end def stop @app_daemon.halt