lib/sqewer/worker.rb in sqewer-6.5.0 vs lib/sqewer/worker.rb in sqewer-6.5.1
- old
+ new
@@ -89,11 +89,11 @@
@logger.info { '[worker] Starting with %d consumer threads' % @num_threads }
@execution_queue = Queue.new
consumers = (1..@num_threads).map do
Thread.new do
- catch(:goodbye) { loop {take_and_execute} }
+ loop { take_and_execute }
end
end
# Create the provider thread. When the execution queue is exhausted,
# grab new messages and place them on the local queue.
@@ -231,11 +231,14 @@
def take_and_execute
message = @execution_queue.pop(nonblock=true)
handle_message(message)
rescue ThreadError # Queue is empty
- throw :goodbye if stopping?
- sleep SLEEP_SECONDS_ON_EMPTY_QUEUE
+ if stopping?
+ Thread.current.exit
+ else
+ sleep SLEEP_SECONDS_ON_EMPTY_QUEUE
+ end
rescue => e # anything else, at or below StandardError that does not need us to quit
@logger.error { '[worker] Failed "%s..." with %s: %s' % [message.inspect[0..64], e.class, e.message] }
e.backtrace.each { |s| @logger.debug{"\t#{s}"} }
end
end