Sha256: 32483adba4cdfcf4f76c34cae8376c6172c3e8701049f2d938484a3d830ef1a8

Contents?: true

Size: 1.38 KB

Versions: 5

Compression:

Stored size: 1.38 KB

Contents

module Shoryuken
  class Processor
    include Util

    def initialize(manager)
      @manager = manager
    end

    def process(queue, sqs_msg)
      worker = Shoryuken.worker_registry.fetch_worker(queue, sqs_msg)
      body = get_body(worker.class, sqs_msg)

      worker.class.server_middleware.invoke(worker, queue, sqs_msg, body) do
        worker.perform(sqs_msg, body)
      end
    ensure
      @manager.processor_done(queue)
    end

    private

    def get_body(worker_class, sqs_msg)
      if sqs_msg.is_a? Array
        sqs_msg.map { |m| parse_body(worker_class, m) }
      else
        parse_body(worker_class, sqs_msg)
      end
    end

    def parse_body(worker_class, sqs_msg)
      body_parser = worker_class.get_shoryuken_options['body_parser']

      case body_parser
      when :json
        JSON.parse(sqs_msg.body)
      when Proc
        body_parser.call(sqs_msg)
      when :text, nil
        sqs_msg.body
      else
        if body_parser.respond_to?(:parse)
          # JSON.parse
          body_parser.parse(sqs_msg.body)
        elsif body_parser.respond_to?(:load)
          # see https://github.com/phstc/shoryuken/pull/91
          # JSON.load
          body_parser.load(sqs_msg.body)
        end
      end
    rescue => e
      logger.error { "Error parsing the message body: #{e.message}\nbody_parser: #{body_parser}\nsqs_msg.body: #{sqs_msg.body}" }
      raise
    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
shoryuken-3.0.4 lib/shoryuken/processor.rb
shoryuken-3.0.3 lib/shoryuken/processor.rb
shoryuken-3.0.2 lib/shoryuken/processor.rb
shoryuken-3.0.1 lib/shoryuken/processor.rb
shoryuken-3.0.0 lib/shoryuken/processor.rb