lib/toiler/actor/processor.rb in toiler-0.5.1.pre9 vs lib/toiler/actor/processor.rb in toiler-0.5.1

- old
+ new

@@ -11,23 +11,21 @@ :extend_callback, :executing, :thread def initialize(queue) @queue = queue @worker_class = Toiler.worker_class_registry[queue] + @fetcher = Toiler.fetcher queue @executing = Concurrent::AtomicBoolean.new @thread = nil init_options + processor_finished end def default_executor Concurrent.global_io_executor end - def fetcher - @fetcher ||= Toiler.fetcher queue - end - def on_message(msg) method, *args = msg send(method, *args) rescue StandardError => e error "Processor #{queue} failed processing, reason: #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}" @@ -89,20 +87,18 @@ fetcher.tell :processor_finished end def visibility_extender(queue_visibility, sqs_msg, body) return unless auto_visibility_timeout? - - interval = [1, queue_visibility / 3].max + interval = [1,queue_visibility/3].max Concurrent::TimerTask.execute execution_interval: interval, - timeout_interval: interval do |task| + timeout_interval: interval do begin sqs_msg.visibility_timeout = queue_visibility yield sqs_msg, body if block_given? rescue StandardError => e - error "Processor #{queue} failed to extend visibility of message - #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}" - task.shutdown if e.message.include?('ReceiptHandle is invalid') + error "Processor #{queue} failed to extend visibility of message: #{e.message}\n#{e.backtrace.join("\n")}" end end end def get_body(sqs_msg) @@ -118,10 +114,10 @@ when :json then JSON.parse sqs_msg.body when Proc then body_parser.call sqs_msg when :text, nil then sqs_msg.body else body_parser.load sqs_msg.body end - rescue StandardError => e + rescue => e raise "Error parsing the message body: #{e.message}" end end end end