Sha256: 8cb85ebb771a48e11bdaa0f4f62f81d37441b67574fb81a30a4c6725beb9b0f7

Contents?: true

Size: 1.53 KB

Versions: 3

Compression:

Stored size: 1.53 KB

Contents

require 'json'

module Shoryuken
  class Processor
    include Celluloid
    include Util

    def initialize(manager)
      @manager = manager
    end

    attr_accessor :proxy_id

    def process(queue, sqs_msg)
      @manager.async.real_thread(proxy_id, Thread.current)

      worker = Shoryuken.worker_registry.fetch_worker(queue, sqs_msg)

        body = get_body(worker.class, sqs_msg)

        worker.class.server_middleware.invoke(worker, queue, sqs_msg, body) do
          worker.perform(sqs_msg, body)
        end

        @manager.async.processor_done(queue, current_actor)
    end

    private

    def get_body(worker_class, sqs_msg)
      if sqs_msg.is_a? Array
        sqs_msg.map { |m| parse_body(worker_class, m) }
      else
        parse_body(worker_class, sqs_msg)
      end
    end

    def parse_body(worker_class, sqs_msg)
      body_parser = worker_class.get_shoryuken_options['body_parser']

      case body_parser
      when :json
        JSON.parse(sqs_msg.body)
      when Proc
        body_parser.call(sqs_msg)
      when :text, nil
        sqs_msg.body
      else
        if body_parser.respond_to?(:parse)
          # JSON.parse
          body_parser.parse(sqs_msg.body)
        elsif body_parser.respond_to?(:load)
          # see https://github.com/phstc/shoryuken/pull/91
          # JSON.load
          body_parser.load(sqs_msg.body)
        end
      end
    rescue => e
      logger.error { "Error parsing the message body: #{e.message}\nbody_parser: #{body_parser}\nsqs_msg.body: #{sqs_msg.body}" }
      raise
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
shoryuken-2.1.1 lib/shoryuken/processor.rb
shoryuken-2.1.0 lib/shoryuken/processor.rb
shoryuken-2.0.11 lib/shoryuken/processor.rb