lib/toiler/actor/processor.rb in toiler-0.6.1 vs lib/toiler/actor/processor.rb in toiler-0.7.0

- old
+ new

@@ -1,18 +1,22 @@ +# frozen_string_literal: true + require 'json' require 'toiler/actor/utils/actor_logging' module Toiler module Actor # Responsible for processing sqs messages and notifying Fetcher when done class Processor < Concurrent::Actor::RestartingContext include Utils::ActorLogging - attr_accessor :queue, :worker_class, :fetcher, :body_parser, - :extend_callback, :executing, :thread + attr_reader :queue, :worker_class, :body_parser, + :executing, :thread def initialize(queue) + super() + @queue = queue @worker_class = Toiler.worker_class_registry[queue] @executing = false @thread = nil init_options @@ -21,109 +25,103 @@ def default_executor Concurrent.global_io_executor end def fetcher - @fetcher ||= Toiler.fetcher queue + @fetcher ||= Toiler.fetcher @queue end def on_message(msg) method, *args = msg send(method, *args) rescue StandardError, SystemStackError => e - # rescue SystemStackError, if clients misbehave and cause a stack level too deep exception, we should be able to recover - error "Processor #{queue} failed processing, reason: #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}" + # if clients misbehave and cause a stack level too deep exception, we should be able to recover + error "Processor #{@queue} failed processing, reason: #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}" end def executing? @executing end private def init_options - @auto_visibility_timeout = @worker_class.auto_visibility_timeout? + @deadline_extension = @worker_class.auto_visibility_timeout? || @worker_class.deadline_extension? @auto_delete = @worker_class.auto_delete? toiler_options = @worker_class.toiler_options @body_parser = toiler_options[:parser] - @extend_callback = toiler_options[:on_visibility_extend] end - def auto_visibility_timeout? - @auto_visibility_timeout + def deadline_extension? + @deadline_extension end def auto_delete? @auto_delete end - def process(visibility, sqs_msg) + def process(ack_deadline, msg) process_init worker = @worker_class.new - body = get_body(sqs_msg) - timer = visibility_extender visibility, sqs_msg, body, &extend_callback + body = get_body(msg) + timer = deadline_extender ack_deadline, msg, body if deadline_extension? - debug "Worker #{queue} starts performing..." - worker.perform sqs_msg, body - debug "Worker #{queue} finishes performing..." - sqs_msg.delete if auto_delete? + debug "Worker #{@queue} starts performing..." + worker.perform msg, body + debug "Worker #{@queue} finishes performing..." + msg.delete if auto_delete? ensure process_cleanup timer end def process_init @executing = true @thread = Thread.current - debug "Processor #{queue} begins processing..." + debug "Processor #{@queue} begins processing..." end def process_cleanup(timer) - debug "Processor #{queue} starts cleanup after perform..." - timer.shutdown if timer + debug "Processor #{@queue} starts cleanup after perform..." + timer&.shutdown ::ActiveRecord::Base.clear_active_connections! if defined? ActiveRecord processor_finished @executing = false @thread = nil - debug "Processor #{queue} finished cleanup after perform..." + debug "Processor #{@queue} finished cleanup after perform..." end def processor_finished fetcher.tell :processor_finished end - def visibility_extender(queue_visibility, sqs_msg, body) - return unless auto_visibility_timeout? - - interval = [1, queue_visibility / 3].max - Concurrent::TimerTask.execute execution_interval: interval, - timeout_interval: interval do |task| - begin - sqs_msg.visibility_timeout = queue_visibility - yield sqs_msg, body if block_given? - rescue StandardError => e - error "Processor #{queue} failed to extend visibility of message - #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}" - task.shutdown if e.message.include?('ReceiptHandle is invalid') - end + def deadline_extender(ack_deadline, msg, _body) + interval = [1, ack_deadline / 3].max + Concurrent::TimerTask.execute execution_interval: interval do |task| + msg.modify_ack_deadline! ack_deadline + rescue StandardError => e + error "Processor #{@queue} failed to extend ack deadline of message " \ + "- #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}" + task.shutdown if e.message.include?('ReceiptHandle is invalid') end end - def get_body(sqs_msg) - if sqs_msg.is_a? Array - sqs_msg.map { |m| parse_body m } + def get_body(msg) + if msg.is_a? Array + msg.map { |m| parse_body m } else - parse_body sqs_msg + parse_body msg end end - def parse_body(sqs_msg) - case body_parser - when :json then JSON.parse sqs_msg.body - when Proc then body_parser.call sqs_msg - when :text, nil then sqs_msg.body - else body_parser.load sqs_msg.body + def parse_body(msg) + case @body_parser + when :json then JSON.parse msg.body + when Proc then @body_parser.call msg + when :text, nil then msg.body + else @body_parser.load msg.body end rescue StandardError => e - raise "Error parsing the message body: #{e.message}" + raise "Error parsing the message body: #{e.message} - #{msg.body}" end end end end