test/system/daemon_tests.rb in qs-0.3.0 vs test/system/daemon_tests.rb in qs-0.4.0
- old
+ new
@@ -9,30 +9,43 @@
desc "Qs::Daemon"
setup do
Qs.reset!
@qs_test_mode = ENV['QS_TEST_MODE']
ENV['QS_TEST_MODE'] = nil
+ Qs.config.dispatcher.queue_name = 'qs-app-dispatcher'
+ Qs.config.event_publisher = 'Daemon System Tests'
Qs.init
+ AppQueue.sync_subscriptions
@orig_config = AppDaemon.configuration.to_hash
end
teardown do
@daemon_runner.stop if @daemon_runner
AppDaemon.configuration.apply(@orig_config) # reset daemon config
- Qs.redis.with{ |c| c.del('slow') }
- Qs.redis.with{ |c| c.del('last_error') }
+ Qs.redis.with do |c|
+ keys = c.keys('*qs-app*')
+ c.pipelined{ keys.each{ |k| c.del(k) } }
+ end
Qs.client.clear(AppQueue.redis_key)
+ AppQueue.clear_subscriptions
Qs.reset!
ENV['QS_TEST_MODE'] = @qs_test_mode
end
+ private
+
+ def setup_app_and_dispatcher_daemon
+ @app_daemon = AppDaemon.new
+ @dispatcher_daemon = DispatcherDaemon.new
+ @daemon_runner = DaemonRunner.new(@app_daemon, @dispatcher_daemon)
+ @app_thread = @daemon_runner.start
+ end
+
end
class RunningDaemonSetupTests < SystemTests
setup do
- @daemon = AppDaemon.new
- @daemon_runner = DaemonRunner.new(@daemon)
- @thread = @daemon_runner.start
+ setup_app_and_dispatcher_daemon
end
end
class BasicJobTests < RunningDaemonSetupTests
@@ -41,186 +54,242 @@
@key, @value = [Factory.string, Factory.string]
AppQueue.add('basic', {
'key' => @key,
'value' => @value
})
- @thread.join 0.5
+ @app_thread.join 0.5
end
should "run the job" do
- assert_equal @value, Qs.redis.with{ |c| c.get(@key) }
+ assert_equal @value, Qs.redis.with{ |c| c.get("qs-app:#{@key}") }
end
end
class JobThatErrorsTests < RunningDaemonSetupTests
desc "with a job that errors"
setup do
@error_message = Factory.text
AppQueue.add('error', 'error_message' => @error_message)
- @thread.join 0.5
+ @app_thread.join 0.5
end
should "run the configured error handler procs" do
exp = "RuntimeError: #{@error_message}"
- assert_equal exp, Qs.redis.with{ |c| c.get('last_error') }
+ assert_equal exp, Qs.redis.with{ |c| c.get('qs-app:last_job_error') }
end
end
class TimeoutJobTests < RunningDaemonSetupTests
desc "with a job that times out"
setup do
AppQueue.add('timeout')
- @thread.join 1 # let the daemon have time to process the job
+ @app_thread.join 1 # let the daemon have time to process the job
end
should "run the configured error handler procs" do
handler_class = AppHandlers::Timeout
exp = "Qs::TimeoutError: #{handler_class} timed out " \
"(#{handler_class.timeout}s)"
- assert_equal exp, Qs.redis.with{ |c| c.get('last_error') }
+ assert_equal exp, Qs.redis.with{ |c| c.get('qs-app:last_job_error') }
end
end
+ class BasicEventTests < RunningDaemonSetupTests
+ desc "with a basic event added"
+ setup do
+ @key, @value = [Factory.string, Factory.string]
+ Qs.publish('qs-app', 'basic', {
+ 'key' => @key,
+ 'value' => @value
+ })
+ @app_thread.join 0.5
+ end
+
+ should "run the event" do
+ assert_equal @value, Qs.redis.with{ |c| c.get("qs-app:#{@key}") }
+ end
+
+ end
+
+ class EventThatErrorsTests < RunningDaemonSetupTests
+ desc "with an event that errors"
+ setup do
+ @error_message = Factory.text
+ Qs.publish('qs-app', 'error', 'error_message' => @error_message)
+ @app_thread.join 0.5
+ end
+
+ should "run the configured error handler procs" do
+ exp = "RuntimeError: #{@error_message}"
+ assert_equal exp, Qs.redis.with{ |c| c.get('qs-app:last_event_error') }
+ end
+
+ end
+
+ class TimeoutEventTests < RunningDaemonSetupTests
+ desc "with an event that times out"
+ setup do
+ Qs.publish('qs-app', 'timeout')
+ @app_thread.join 1 # let the daemon have time to process the job
+ end
+
+ should "run the configured error handler procs" do
+ handler_class = AppHandlers::TimeoutEvent
+ exp = "Qs::TimeoutError: #{handler_class} timed out " \
+ "(#{handler_class.timeout}s)"
+ assert_equal exp, Qs.redis.with{ |c| c.get('qs-app:last_event_error') }
+ end
+
+ end
+
class NoWorkersAvailableTests < SystemTests
desc "when no workers are available"
setup do
AppDaemon.workers 0 # no workers available, don't do this
- @daemon = AppDaemon.new
- @daemon_runner = DaemonRunner.new(@daemon)
- @thread = @daemon_runner.start
+ setup_app_and_dispatcher_daemon
end
should "shutdown when stopped" do
- @daemon.stop
- @thread.join 2 # give it time to shutdown, should be faster
- assert_false @thread.alive?
+ @app_daemon.stop
+ @app_thread.join 2 # give it time to shutdown, should be faster
+ assert_false @app_thread.alive?
end
should "shutdown when halted" do
- @daemon.halt
- @thread.join 2 # give it time to shutdown, should be faster
- assert_false @thread.alive?
+ @app_daemon.halt
+ @app_thread.join 2 # give it time to shutdown, should be faster
+ assert_false @app_thread.alive?
end
end
class ShutdownWithoutTimeoutTests < SystemTests
desc "without a shutdown timeout"
setup do
AppDaemon.shutdown_timeout nil # disable shutdown timeout
- @daemon = AppDaemon.new
- @daemon_runner = DaemonRunner.new(@daemon)
- @thread = @daemon_runner.start
+ setup_app_and_dispatcher_daemon
AppQueue.add('slow')
- @thread.join 1 # let the daemon have time to process the job
+ Qs.publish('qs-app', 'slow')
+ @app_thread.join 1 # let the daemon have time to process the job and event
end
- should "shutdown and let the job finished" do
- @daemon.stop
- @thread.join 10 # give it time to shutdown, should be faster
- assert_false @thread.alive?
- assert_equal 'finished', Qs.redis.with{ |c| c.get('slow') }
+ should "shutdown and let the job and event finish" do
+ @app_daemon.stop
+ @app_thread.join 10 # give it time to shutdown, should be faster
+ assert_false @app_thread.alive?
+ assert_equal 'finished', Qs.redis.with{ |c| c.get('qs-app:slow') }
+ assert_equal 'finished', Qs.redis.with{ |c| c.get('qs-app:slow:event') }
end
- should "shutdown and not let the job finished" do
- @daemon.halt
- @thread.join 2 # give it time to shutdown, should be faster
- assert_false @thread.alive?
- assert_nil Qs.redis.with{ |c| c.get('slow') }
+ should "shutdown and not let the job or event finish" do
+ @app_daemon.halt
+ @app_thread.join 2 # give it time to shutdown, should be faster
+ assert_false @app_thread.alive?
+ assert_nil Qs.redis.with{ |c| c.get('qs-app:slow') }
exp = "Qs::ShutdownError"
- assert_equal exp, Qs.redis.with{ |c| c.get('last_error') }
+ assert_equal exp, Qs.redis.with{ |c| c.get('qs-app:last_job_error') }
+ assert_nil Qs.redis.with{ |c| c.get('qs-app:slow:event') }
+ exp = "Qs::ShutdownError"
+ assert_equal exp, Qs.redis.with{ |c| c.get('qs-app:last_event_error') }
end
end
class ShutdownWithTimeoutTests < SystemTests
desc "with a shutdown timeout"
setup do
AppDaemon.shutdown_timeout 1
- @daemon = AppDaemon.new
- @daemon_runner = DaemonRunner.new(@daemon)
- @thread = @daemon_runner.start
+ setup_app_and_dispatcher_daemon
AppQueue.add('slow')
- @thread.join 1 # let the daemon have time to process the job
+ Qs.publish('qs-app', 'slow')
+ @app_thread.join 1 # let the daemon have time to process the job and event
end
- should "shutdown and not let the job finished" do
- @daemon.stop
- @thread.join 2 # give it time to shutdown, should be faster
- assert_false @thread.alive?
- assert_nil Qs.redis.with{ |c| c.get('slow') }
+ should "shutdown and not let the job or event finish" do
+ @app_daemon.stop
+ @app_thread.join 2 # give it time to shutdown, should be faster
+ assert_false @app_thread.alive?
+ assert_nil Qs.redis.with{ |c| c.get('qs-app:slow') }
exp = "Qs::ShutdownError"
- assert_equal exp, Qs.redis.with{ |c| c.get('last_error') }
+ assert_equal exp, Qs.redis.with{ |c| c.get('qs-app:last_job_error') }
+ assert_nil Qs.redis.with{ |c| c.get('qs-app:slow:event') }
+ exp = "Qs::ShutdownError"
+ assert_equal exp, Qs.redis.with{ |c| c.get('qs-app:last_event_error') }
end
- should "shutdown and not let the job finished" do
- @daemon.halt
- @thread.join 2 # give it time to shutdown, should be faster
- assert_false @thread.alive?
- assert_nil Qs.redis.with{ |c| c.get('slow') }
+ should "shutdown and not let the job or event finish" do
+ @app_daemon.halt
+ @app_thread.join 2 # give it time to shutdown, should be faster
+ assert_false @app_thread.alive?
+ assert_nil Qs.redis.with{ |c| c.get('qs-app:slow') }
exp = "Qs::ShutdownError"
- assert_equal exp, Qs.redis.with{ |c| c.get('last_error') }
+ assert_equal exp, Qs.redis.with{ |c| c.get('qs-app:last_job_error') }
+ assert_nil Qs.redis.with{ |c| c.get('qs-app:slow:event') }
+ exp = "Qs::ShutdownError"
+ assert_equal exp, Qs.redis.with{ |c| c.get('qs-app:last_event_error') }
end
end
- class ShutdownWithUnprocessedRedisItemTests < SystemTests
- desc "with a redis item that gets picked up but doesn't get processed"
+ class ShutdownWithUnprocessedQueueItemTests < SystemTests
+ desc "with a queue item that gets picked up but doesn't get processed"
setup do
Assert.stub(Qs::PayloadHandler, :new){ sleep 5 }
AppDaemon.shutdown_timeout 1
AppDaemon.workers 2
- @daemon = AppDaemon.new
- @daemon_runner = DaemonRunner.new(@daemon)
- @thread = @daemon_runner.start
+ setup_app_and_dispatcher_daemon
AppQueue.add('slow')
AppQueue.add('slow')
AppQueue.add('basic')
- @thread.join 1 # let the daemon have time to process jobs
+ @app_thread.join 1 # let the daemon have time to process jobs
end
- should "shutdown and requeue the redis item" do
- @daemon.stop
- @thread.join 2 # give it time to shutdown, should be faster
- assert_false @thread.alive?
- # TODO - better way to read whats on a queue
- serialized_payloads = Qs.redis.with{ |c| c.lrange(AppQueue.redis_key, 0, 3) }
- names = serialized_payloads.map{ |sp| Qs::Job.parse(Qs.deserialize(sp)).name }
+ should "shutdown and requeue the queue item" do
+ @app_daemon.stop
+ @app_thread.join 2 # give it time to shutdown, should be faster
+ assert_false @app_thread.alive?
+ encoded_payloads = Qs.redis.with{ |c| c.lrange(AppQueue.redis_key, 0, 3) }
+ names = encoded_payloads.map{ |sp| Qs::Payload.deserialize(sp).name }
assert_equal ['basic', 'slow', 'slow'], names
end
- should "shutdown and requeue the redis item" do
- @daemon.halt
- @thread.join 2 # give it time to shutdown, should be faster
- assert_false @thread.alive?
- # TODO - better way to read whats on a queue
- serialized_payloads = Qs.redis.with{ |c| c.lrange(AppQueue.redis_key, 0, 3) }
- names = serialized_payloads.map{ |sp| Qs::Job.parse(Qs.deserialize(sp)).name }
+ should "shutdown and requeue the queue item" do
+ @app_daemon.halt
+ @app_thread.join 2 # give it time to shutdown, should be faster
+ assert_false @app_thread.alive?
+ encoded_payloads = Qs.redis.with{ |c| c.lrange(AppQueue.redis_key, 0, 3) }
+ names = encoded_payloads.map{ |sp| Qs::Payload.deserialize(sp).name }
assert_equal ['basic', 'slow', 'slow'], names
end
end
class DaemonRunner
- def initialize(daemon)
- @daemon = daemon
- @thread = nil
+ def initialize(app_daemon, dispatcher_daemon = nil)
+ @app_daemon = app_daemon
+ @dispatcher_daemon = dispatcher_daemon
+ @app_thread = nil
+ @dispatcher_thread = nil
end
def start
- @thread = @daemon.start
+ @app_thread = @app_daemon.start
+ @dispatcher_thread = @dispatcher_daemon.start if @dispatcher_daemon
+ @app_thread
end
def stop
- @daemon.halt
- @thread.join if @thread
+ @app_daemon.halt
+ @dispatcher_daemon.halt if @dispatcher_daemon
+ @app_thread.join if @app_thread
+ @dispatcher_thread.join if @dispatcher_thread
end
end
end