lib/toiler/actor/processor.rb in toiler-0.5.1.pre9 vs lib/toiler/actor/processor.rb in toiler-0.5.1
- old
+ new
@@ -11,23 +11,21 @@
:extend_callback, :executing, :thread
def initialize(queue)
@queue = queue
@worker_class = Toiler.worker_class_registry[queue]
+ @fetcher = Toiler.fetcher queue
@executing = Concurrent::AtomicBoolean.new
@thread = nil
init_options
+ processor_finished
end
def default_executor
Concurrent.global_io_executor
end
- def fetcher
- @fetcher ||= Toiler.fetcher queue
- end
-
def on_message(msg)
method, *args = msg
send(method, *args)
rescue StandardError => e
error "Processor #{queue} failed processing, reason: #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}"
@@ -89,20 +87,18 @@
fetcher.tell :processor_finished
end
def visibility_extender(queue_visibility, sqs_msg, body)
return unless auto_visibility_timeout?
-
- interval = [1, queue_visibility / 3].max
+ interval = [1,queue_visibility/3].max
Concurrent::TimerTask.execute execution_interval: interval,
- timeout_interval: interval do |task|
+ timeout_interval: interval do
begin
sqs_msg.visibility_timeout = queue_visibility
yield sqs_msg, body if block_given?
rescue StandardError => e
- error "Processor #{queue} failed to extend visibility of message - #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}"
- task.shutdown if e.message.include?('ReceiptHandle is invalid')
+ error "Processor #{queue} failed to extend visibility of message: #{e.message}\n#{e.backtrace.join("\n")}"
end
end
end
def get_body(sqs_msg)
@@ -118,10 +114,10 @@
when :json then JSON.parse sqs_msg.body
when Proc then body_parser.call sqs_msg
when :text, nil then sqs_msg.body
else body_parser.load sqs_msg.body
end
- rescue StandardError => e
+ rescue => e
raise "Error parsing the message body: #{e.message}"
end
end
end
end