Sha256: 789ab513e4ed1d5984c4101d36156cfdb5a48b4016825fcc843bcfbc215b75f4
Contents?: true
Size: 1.49 KB
Versions: 11
Compression:
Stored size: 1.49 KB
Contents
module Shoryuken class Processor include Util attr_reader :queue, :sqs_msg def self.process(queue, sqs_msg) new(queue, sqs_msg).process end def initialize(queue, sqs_msg) @queue = queue @sqs_msg = sqs_msg end def process return logger.error { "No worker found for #{queue}" } unless worker worker.class.server_middleware.invoke(worker, queue, sqs_msg, body) do worker.perform(sqs_msg, body) end rescue Exception => ex logger.error { "Processor failed: #{ex.message}" } logger.error { ex.backtrace.join("\n") } unless ex.backtrace.nil? raise end private def worker @_worker ||= Shoryuken.worker_registry.fetch_worker(queue, sqs_msg) end def worker_class worker.class end def body @_body ||= sqs_msg.is_a?(Array) ? sqs_msg.map(&method(:parse_body)) : parse_body(sqs_msg) end def parse_body(sqs_msg) body_parser = worker_class.get_shoryuken_options['body_parser'] case body_parser when :json JSON.parse(sqs_msg.body) when Proc body_parser.call(sqs_msg) when :text, nil sqs_msg.body else if body_parser.respond_to?(:parse) # JSON.parse body_parser.parse(sqs_msg.body) elsif body_parser.respond_to?(:load) # see https://github.com/phstc/shoryuken/pull/91 # JSON.load body_parser.load(sqs_msg.body) end end end end end
Version data entries
11 entries across 11 versions & 1 rubygems