Sha256: 932f8c05fe48d0e2c998c667eb89a503873995d27fb5da724af9a11c68be503c

Contents?: true

Size: 1.32 KB

Versions: 2

Compression:

Stored size: 1.32 KB

Contents

module RisingDragon
  module SQS
    module Worker
      def self.included(base)
        base.class_eval do
          include Shoryuken::Worker
        end
        base.extend(ClassMethods)
      end

      module ClassMethods
        def rising_dragon_options(sqs_queue_name, group_name, opt = {})
          shoryuken_opt = { queue: sqs_queue_name, body_parser: :json, auto_delete: true }.merge(opt)
          shoryuken_options(shoryuken_opt)

          register_queue(sqs_queue_name, group_name, opt)
        end

        def register_queue(sqs_queue_name, group_name, option)
          concurrency = option[:concurrency] || 25
          Shoryuken.add_group(group_name, concurrency)

          weight = option[:weight] || 1
          Shoryuken.add_queue(sqs_queue_name, weight, group_name)
        end

        def rising_dragon_register(event_name, handle_class)
          emitter.register(event_name, handle_class)
        end

        def rising_dragon_ignore(event_name)
          emitter.ignore(event_name)
        end

        def emitter
          @emitter ||= ::RisingDragon::SQS::Emitter.new
        end
      end

      def perform(_sqs_msg, body)
        self.class.emitter.emit_sns_msg(body)
      rescue => e
        rescue_from(e)
      end

      def rescue_from(e)
        # overwrite here
        raise e
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
rising_dragon-0.2.1 lib/rising_dragon/sqs/worker.rb
rising_dragon-0.2.0 lib/rising_dragon/sqs/worker.rb