lib/toiler/actor/processor.rb in toiler-0.5.1 vs lib/toiler/actor/processor.rb in toiler-0.6.0.pre1

- old
+ new

@@ -11,21 +11,23 @@ :extend_callback, :executing, :thread def initialize(queue) @queue = queue @worker_class = Toiler.worker_class_registry[queue] - @fetcher = Toiler.fetcher queue @executing = Concurrent::AtomicBoolean.new @thread = nil init_options - processor_finished end def default_executor Concurrent.global_io_executor end + def fetcher + @fetcher ||= Toiler.fetcher queue + end + def on_message(msg) method, *args = msg send(method, *args) rescue StandardError => e error "Processor #{queue} failed processing, reason: #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}" @@ -87,18 +89,20 @@ fetcher.tell :processor_finished end def visibility_extender(queue_visibility, sqs_msg, body) return unless auto_visibility_timeout? - interval = [1,queue_visibility/3].max + + interval = [1, queue_visibility / 3].max Concurrent::TimerTask.execute execution_interval: interval, - timeout_interval: interval do + timeout_interval: interval do |task| begin sqs_msg.visibility_timeout = queue_visibility yield sqs_msg, body if block_given? rescue StandardError => e - error "Processor #{queue} failed to extend visibility of message: #{e.message}\n#{e.backtrace.join("\n")}" + error "Processor #{queue} failed to extend visibility of message - #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}" + task.shutdown if e.message.include?('ReceiptHandle is invalid') end end end def get_body(sqs_msg) @@ -114,10 +118,10 @@ when :json then JSON.parse sqs_msg.body when Proc then body_parser.call sqs_msg when :text, nil then sqs_msg.body else body_parser.load sqs_msg.body end - rescue => e + rescue StandardError => e raise "Error parsing the message body: #{e.message}" end end end end