Sha256: c934c5a3ab53d60427329cb6376ed61b282fb953adde781cbcaf84f5193fe3cc

Contents?: true

Size: 1.36 KB

Versions: 3

Compression:

Stored size: 1.36 KB

Contents

module RisingDragon
  module SQS
    module Worker
      def self.included(base)
        base.class_eval do
          include Shoryuken::Worker
        end
        base.extend(ClassMethods)
      end

      module ClassMethods
        def rising_dragon_options(sqs_queue_name, opt = {})
          shoryuken_opt = { queue: sqs_queue_name, body_parser: :json, auto_delete: true, shoryuken_group: "default_group" }.merge(opt)
          shoryuken_options(shoryuken_opt)

          register_queue(sqs_queue_name, shoryuken_opt[:shoryuken_group], opt)
        end

        def register_queue(sqs_queue_name, group_name, option)
          concurrency = option[:concurrency] || 25
          Shoryuken.add_group(group_name, concurrency)

          weight = option[:weight] || 1
          Shoryuken.add_queue(sqs_queue_name, weight, group_name)
        end

        def rising_dragon_register(event_name, handle_class)
          emitter.register(event_name, handle_class)
        end

        def rising_dragon_ignore(event_name)
          emitter.ignore(event_name)
        end

        def emitter
          @emitter ||= ::RisingDragon::SQS::Emitter.new
        end
      end

      def perform(_sqs_msg, body)
        self.class.emitter.emit_sns_msg(body)
      rescue => e
        rescue_from(e)
      end

      def rescue_from(e)
        # overwrite here
        raise e
      end
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
rising_dragon-0.3.2 lib/rising_dragon/sqs/worker.rb
rising_dragon-0.3.1 lib/rising_dragon/sqs/worker.rb
rising_dragon-0.3.0 lib/rising_dragon/sqs/worker.rb