lib/sqewer/worker.rb in sqewer-6.5.0 vs lib/sqewer/worker.rb in sqewer-6.5.1

- old
+ new

@@ -89,11 +89,11 @@ @logger.info { '[worker] Starting with %d consumer threads' % @num_threads } @execution_queue = Queue.new consumers = (1..@num_threads).map do Thread.new do - catch(:goodbye) { loop {take_and_execute} } + loop { take_and_execute } end end # Create the provider thread. When the execution queue is exhausted, # grab new messages and place them on the local queue. @@ -231,11 +231,14 @@ def take_and_execute message = @execution_queue.pop(nonblock=true) handle_message(message) rescue ThreadError # Queue is empty - throw :goodbye if stopping? - sleep SLEEP_SECONDS_ON_EMPTY_QUEUE + if stopping? + Thread.current.exit + else + sleep SLEEP_SECONDS_ON_EMPTY_QUEUE + end rescue => e # anything else, at or below StandardError that does not need us to quit @logger.error { '[worker] Failed "%s..." with %s: %s' % [message.inspect[0..64], e.class, e.message] } e.backtrace.each { |s| @logger.debug{"\t#{s}"} } end end