lib/sqewer/worker.rb in sqewer-3.0.1 vs lib/sqewer/worker.rb in sqewer-4.0.0
- old
+ new
@@ -93,12 +93,12 @@
@execution_queue = Queue.new
consumers = (1..@num_threads).map do
Thread.new do
loop {
- break if @state.in_state?(:stopping)
take_and_execute
+ break if stopping?
}
end
end
# Create the provider thread. When the execution queue is exhausted,
@@ -178,23 +178,19 @@
def handle_message(message)
return unless message.receipt_handle
return @connection.delete_message(message.receipt_handle) unless message.has_body?
@isolator.perform(self, message)
- @connection.delete_message(message.receipt_handle)
+ # The message delete happens within the Isolator
end
def take_and_execute
message = @execution_queue.pop(nonblock=true)
handle_message(message)
rescue ThreadError # Queue is empty
sleep SLEEP_SECONDS_ON_EMPTY_QUEUE
Thread.pass
rescue => e # anything else, at or below StandardError that does not need us to quit
- @logger.error { "[worker] Failed #{message.inspect} with #{e}" }
- @logger.error(e.class)
- @logger.error(e.message)
+ @logger.error { "[worker] Failed %s... with %s: %s" % [message.inspect[0..64], e.class, e.message] }
e.backtrace.each { |s| @logger.error{"\t#{s}"} }
end
-
- STR_logger = 'logger'
end