test/system/daemon_tests.rb in qs-0.6.1 vs test/system/daemon_tests.rb in qs-0.7.0
- old
+ new
@@ -1,27 +1,27 @@
require 'assert'
require 'qs/daemon'
-require 'test/support/app_daemon'
+require 'test/support/app_queue'
module Qs::Daemon
class SystemTests < Assert::Context
desc "Qs::Daemon"
setup do
Qs.reset!
@qs_test_mode = ENV['QS_TEST_MODE']
ENV['QS_TEST_MODE'] = nil
- Qs.config.dispatcher.queue_name = 'qs-app-dispatcher'
+ Qs.config.dispatcher_queue_name = 'qs-app-dispatcher'
Qs.config.event_publisher = 'Daemon System Tests'
Qs.init
AppQueue.sync_subscriptions
- @orig_config = AppDaemon.configuration.to_hash
+
+ @app_daemon_class = build_app_daemon_class
end
teardown do
@daemon_runner.stop if @daemon_runner
- AppDaemon.configuration.apply(@orig_config) # reset daemon config
Qs.redis.with do |c|
keys = c.keys('*qs-app*')
c.pipelined{ keys.each{ |k| c.del(k) } }
end
Qs.client.clear(AppQueue.redis_key)
@@ -30,14 +30,46 @@
ENV['QS_TEST_MODE'] = @qs_test_mode
end
private
+ # manually build new anonymous app daemon classes for each run. We do this
+ # both to not mess with global state when tweaking config values for tests
+ # and b/c there is no way to "reset" an existing class's config.
+ def build_app_daemon_class
+ Class.new do
+ include Qs::Daemon
+
+ name 'qs-app'
+
+ logger Logger.new(ROOT_PATH.join('log/app_daemon.log').to_s)
+ logger.datetime_format = "" # turn off the datetime in the logs
+
+ verbose_logging true
+
+ queue AppQueue
+
+ error do |exception, context|
+ return unless (message = context.message)
+ payload_type = message.payload_type
+ route_name = message.route_name
+ case(route_name)
+ when 'error', 'timeout', 'qs-app:error', 'qs-app:timeout'
+ error = "#{exception.class}: #{exception.message}"
+ Qs.redis.with{ |c| c.set("qs-app:last_#{payload_type}_error", error) }
+ when 'slow', 'qs-app:slow'
+ error = exception.class.to_s
+ Qs.redis.with{ |c| c.set("qs-app:last_#{payload_type}_error", error) }
+ end
+ end
+ end
+ end
+
def setup_app_and_dispatcher_daemon
- @app_daemon = AppDaemon.new
- @dispatcher_daemon = DispatcherDaemon.new
- @daemon_runner = DaemonRunner.new(@app_daemon, @dispatcher_daemon)
+ @app_daemon = @app_daemon_class.new
+ @dispatcher_daemon = AppDispatcherDaemon.new
+ @daemon_runner = AppDaemonRunner.new(@app_daemon, @dispatcher_daemon)
@app_thread = @daemon_runner.start
end
end
@@ -54,11 +86,11 @@
@key, @value = [Factory.string, Factory.string]
AppQueue.add('basic', {
'key' => @key,
'value' => @value
})
- @app_thread.join 0.5
+ @app_thread.join(JOIN_SECONDS)
end
should "run the job" do
assert_equal @value, Qs.redis.with{ |c| c.get("qs-app:#{@key}") }
end
@@ -68,11 +100,11 @@
class JobThatErrorsTests < RunningDaemonSetupTests
desc "with a job that errors"
setup do
@error_message = Factory.text
AppQueue.add('error', 'error_message' => @error_message)
- @app_thread.join 0.5
+ @app_thread.join(JOIN_SECONDS)
end
should "run the configured error handler procs" do
exp = "RuntimeError: #{@error_message}"
assert_equal exp, Qs.redis.with{ |c| c.get('qs-app:last_job_error') }
@@ -82,11 +114,11 @@
class TimeoutJobTests < RunningDaemonSetupTests
desc "with a job that times out"
setup do
AppQueue.add('timeout')
- @app_thread.join 1 # let the daemon have time to process the job
+ @app_thread.join(AppHandlers::Timeout::TIMEOUT_TIME + JOIN_SECONDS)
end
should "run the configured error handler procs" do
handler_class = AppHandlers::Timeout
exp = "Qs::TimeoutError: #{handler_class} timed out " \
@@ -102,11 +134,11 @@
@key, @value = [Factory.string, Factory.string]
Qs.publish('qs-app', 'basic', {
'key' => @key,
'value' => @value
})
- @app_thread.join 0.5
+ @app_thread.join(JOIN_SECONDS)
end
should "run the event" do
assert_equal @value, Qs.redis.with{ |c| c.get("qs-app:#{@key}") }
end
@@ -116,11 +148,11 @@
class EventThatErrorsTests < RunningDaemonSetupTests
desc "with an event that errors"
setup do
@error_message = Factory.text
Qs.publish('qs-app', 'error', 'error_message' => @error_message)
- @app_thread.join 0.5
+ @app_thread.join(JOIN_SECONDS)
end
should "run the configured error handler procs" do
exp = "RuntimeError: #{@error_message}"
assert_equal exp, Qs.redis.with{ |c| c.get('qs-app:last_event_error') }
@@ -130,11 +162,11 @@
class TimeoutEventTests < RunningDaemonSetupTests
desc "with an event that times out"
setup do
Qs.publish('qs-app', 'timeout')
- @app_thread.join 1 # let the daemon have time to process the job
+ @app_thread.join(AppHandlers::Timeout::TIMEOUT_TIME + JOIN_SECONDS)
end
should "run the configured error handler procs" do
handler_class = AppHandlers::TimeoutEvent
exp = "Qs::TimeoutError: #{handler_class} timed out " \
@@ -145,118 +177,143 @@
end
class ShutdownWithoutTimeoutTests < SystemTests
desc "without a shutdown timeout"
setup do
- AppDaemon.shutdown_timeout nil # disable shutdown timeout
+ @app_daemon_class.shutdown_timeout nil # disable shutdown timeout
+ @nil_shutdown_timeout = 10 # something absurdly long, it should be faster
+ # than this but want some timout to keep tests
+ # from hanging in case it never shuts down
+
setup_app_and_dispatcher_daemon
AppQueue.add('slow')
Qs.publish('qs-app', 'slow')
- @app_thread.join 1 # let the daemon have time to process the job and event
+ @app_thread.join(JOIN_SECONDS)
end
should "shutdown and let the job and event finish" do
@app_daemon.stop
- @app_thread.join 10 # give it time to shutdown, should be faster
+ @app_thread.join(@nil_shutdown_timeout)
+
assert_false @app_thread.alive?
assert_equal 'finished', Qs.redis.with{ |c| c.get('qs-app:slow') }
assert_equal 'finished', Qs.redis.with{ |c| c.get('qs-app:slow:event') }
end
should "shutdown and not let the job or event finish" do
@app_daemon.halt
- @app_thread.join 2 # give it time to shutdown, should be faster
+ @app_thread.join(@nil_shutdown_timeout)
+
assert_false @app_thread.alive?
assert_nil Qs.redis.with{ |c| c.get('qs-app:slow') }
+
exp = "Qs::ShutdownError"
assert_equal exp, Qs.redis.with{ |c| c.get('qs-app:last_job_error') }
assert_nil Qs.redis.with{ |c| c.get('qs-app:slow:event') }
+
exp = "Qs::ShutdownError"
assert_equal exp, Qs.redis.with{ |c| c.get('qs-app:last_event_error') }
end
end
class ShutdownWithTimeoutTests < SystemTests
desc "with a shutdown timeout"
setup do
- AppDaemon.shutdown_timeout 1
+ @shutdown_timeout = AppHandlers::Slow::SLOW_TIME * 0.5
+ @app_daemon_class.shutdown_timeout @shutdown_timeout
setup_app_and_dispatcher_daemon
AppQueue.add('slow')
Qs.publish('qs-app', 'slow')
- @app_thread.join 1 # let the daemon have time to process the job and event
+ @app_thread.join(JOIN_SECONDS)
end
should "shutdown and not let the job or event finish" do
@app_daemon.stop
- @app_thread.join 2 # give it time to shutdown, should be faster
+ @app_thread.join(@shutdown_timeout + JOIN_SECONDS)
+
assert_false @app_thread.alive?
assert_nil Qs.redis.with{ |c| c.get('qs-app:slow') }
+
exp = "Qs::ShutdownError"
assert_equal exp, Qs.redis.with{ |c| c.get('qs-app:last_job_error') }
assert_nil Qs.redis.with{ |c| c.get('qs-app:slow:event') }
+
exp = "Qs::ShutdownError"
assert_equal exp, Qs.redis.with{ |c| c.get('qs-app:last_event_error') }
end
should "shutdown and not let the job or event finish" do
@app_daemon.halt
- @app_thread.join 2 # give it time to shutdown, should be faster
+ @app_thread.join(@shutdown_timeout + JOIN_SECONDS)
+
assert_false @app_thread.alive?
assert_nil Qs.redis.with{ |c| c.get('qs-app:slow') }
+
exp = "Qs::ShutdownError"
assert_equal exp, Qs.redis.with{ |c| c.get('qs-app:last_job_error') }
assert_nil Qs.redis.with{ |c| c.get('qs-app:slow:event') }
+
exp = "Qs::ShutdownError"
assert_equal exp, Qs.redis.with{ |c| c.get('qs-app:last_event_error') }
end
end
class ShutdownWithUnprocessedQueueItemTests < SystemTests
desc "with a queue item that gets picked up but doesn't get processed"
setup do
- Assert.stub(Qs::PayloadHandler, :new){ sleep 5 }
+ Assert.stub(Qs::PayloadHandler, :new) do
+ sleep AppHandlers::Slow::SLOW_TIME + JOIN_SECONDS
+ end
- AppDaemon.shutdown_timeout 1
- AppDaemon.workers 2
+ @shutdown_timeout = AppHandlers::Slow::SLOW_TIME * 0.5
+ @app_daemon_class.shutdown_timeout @shutdown_timeout
+ @app_daemon_class.workers 2
setup_app_and_dispatcher_daemon
- AppQueue.add('slow')
- AppQueue.add('slow')
- AppQueue.add('basic')
- @app_thread.join 1 # let the daemon have time to process jobs
+ AppQueue.add('slow1')
+ AppQueue.add('slow2')
+ AppQueue.add('basic1')
+
+ @app_thread.join(JOIN_SECONDS)
end
should "shutdown and requeue the queue item" do
@app_daemon.stop
- @app_thread.join 2 # give it time to shutdown, should be faster
+ @app_thread.join(@shutdown_timeout + JOIN_SECONDS)
+
assert_false @app_thread.alive?
+
encoded_payloads = Qs.redis.with{ |c| c.lrange(AppQueue.redis_key, 0, 3) }
names = encoded_payloads.map{ |sp| Qs::Payload.deserialize(sp).name }
- assert_equal ['basic', 'slow', 'slow'], names
+
+ ['slow1', 'slow2', 'basic1'].each{ |n| assert_includes n, names }
end
should "shutdown and requeue the queue item" do
@app_daemon.halt
- @app_thread.join 2 # give it time to shutdown, should be faster
+ @app_thread.join(@shutdown_timeout + JOIN_SECONDS)
+
assert_false @app_thread.alive?
- encoded_payloads = Qs.redis.with{ |c| c.lrange(AppQueue.redis_key, 0, 3) }
+
+ encoded_payloads = Qs.redis.with{ |c| c.lrange(AppQueue.redis_key, 0, 4) }
names = encoded_payloads.map{ |sp| Qs::Payload.deserialize(sp).name }
- assert_equal ['basic', 'slow', 'slow'], names
+
+ ['slow1', 'slow2', 'basic1'].each{ |n| assert_includes n, names }
end
end
class WithEnvProcessLabelTests < SystemTests
desc "with a process label env var"
setup do
ENV['QS_PROCESS_LABEL'] = Factory.string
- @daemon = AppDaemon.new
+ @daemon = @app_daemon_class.new
end
teardown do
ENV.delete('QS_PROCESS_LABEL')
end
subject{ @daemon }
@@ -265,20 +322,44 @@
assert_equal ENV['QS_PROCESS_LABEL'], subject.process_label
end
end
- class DaemonRunner
+ class AppDispatcherDaemon
+ include Qs::Daemon
+
+ name 'qs-app-dispatcher'
+
+ logger Logger.new(ROOT_PATH.join('log/app_dispatcher_daemon.log').to_s)
+ logger.datetime_format = "" # turn off the datetime in the logs
+
+ verbose_logging true
+
+ # we build a "custom" dispatcher because we can't rely on Qs being initialized
+ # when this is required
+ queue Qs::DispatcherQueue.new({
+ :queue_class => Qs.config.dispatcher_queue_class,
+ :queue_name => 'qs-app-dispatcher',
+ :job_name => Qs.config.dispatcher_job_name,
+ :job_handler_class_name => Qs.config.dispatcher_job_handler_class_name
+ })
+ end
+
+ class AppDaemonRunner
def initialize(app_daemon, dispatcher_daemon = nil)
@app_daemon = app_daemon
@dispatcher_daemon = dispatcher_daemon
@app_thread = nil
@dispatcher_thread = nil
end
def start
@app_thread = @app_daemon.start
- @dispatcher_thread = @dispatcher_daemon.start if @dispatcher_daemon
+ @app_thread.join(JOIN_SECONDS)
+ if @dispatcher_daemon
+ @dispatcher_thread = @dispatcher_daemon.start
+ @dispatcher_thread.join(JOIN_SECONDS)
+ end
@app_thread
end
def stop
@app_daemon.halt