lib/toiler/actor/processor.rb in toiler-0.5.1 vs lib/toiler/actor/processor.rb in toiler-0.6.0.pre1
- old
+ new
@@ -11,21 +11,23 @@
:extend_callback, :executing, :thread
def initialize(queue)
@queue = queue
@worker_class = Toiler.worker_class_registry[queue]
- @fetcher = Toiler.fetcher queue
@executing = Concurrent::AtomicBoolean.new
@thread = nil
init_options
- processor_finished
end
def default_executor
Concurrent.global_io_executor
end
+ def fetcher
+ @fetcher ||= Toiler.fetcher queue
+ end
+
def on_message(msg)
method, *args = msg
send(method, *args)
rescue StandardError => e
error "Processor #{queue} failed processing, reason: #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}"
@@ -87,18 +89,20 @@
fetcher.tell :processor_finished
end
def visibility_extender(queue_visibility, sqs_msg, body)
return unless auto_visibility_timeout?
- interval = [1,queue_visibility/3].max
+
+ interval = [1, queue_visibility / 3].max
Concurrent::TimerTask.execute execution_interval: interval,
- timeout_interval: interval do
+ timeout_interval: interval do |task|
begin
sqs_msg.visibility_timeout = queue_visibility
yield sqs_msg, body if block_given?
rescue StandardError => e
- error "Processor #{queue} failed to extend visibility of message: #{e.message}\n#{e.backtrace.join("\n")}"
+ error "Processor #{queue} failed to extend visibility of message - #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}"
+ task.shutdown if e.message.include?('ReceiptHandle is invalid')
end
end
end
def get_body(sqs_msg)
@@ -114,10 +118,10 @@
when :json then JSON.parse sqs_msg.body
when Proc then body_parser.call sqs_msg
when :text, nil then sqs_msg.body
else body_parser.load sqs_msg.body
end
- rescue => e
+ rescue StandardError => e
raise "Error parsing the message body: #{e.message}"
end
end
end
end