Sha256: cd18aca9e1182baa7ff1ccc09dd6eecc5b7e27f5a0086a614f5f6182acb399cc

Contents?: true

Size: 1.37 KB

Versions: 1

Compression:

Stored size: 1.37 KB

Contents

module Hat
  class Worker
    include Hat::Configurable

    attr_accessor :id, :logger

    has_configuration :worker, {
                               middlewares: [],
                               queue:       {
                                              name: 'hat.direct'
                                            }.merge(config.queue)
                             }

    class << self

      def queue(name)
        config.worker.queue.name = name
      end

    end


    def initialize(options)
      @options    = options
      @connection = Bunny.new
    end

    def stop
      @stop = true
    end

    def run
      logger.debug 'Starting rabbit connection'
      @connection.start
      logger.debug 'Creating channel'
      @channel = @connection.create_channel
      logger.info "Subscribing to #{config.worker.queue.name}"
      queue.subscribe do |delivery_info, properties, payload|
        perform(*call(delivery_info, properties, payload))
      end
    end

    def call(delivery_info, properties, payload)
      config.worker.middlwares.each { |m| delivery_info, properties, payload = m.call(delivery_info, properties, payload) }
    end

    private

    def exchange
      @exchange ||= @channel.direct('hat.direct')
    end

    def queue
      @queue ||= @channel.queue(config.worker.queue.name).bind(exchange, config.worker.queue.bind_options || {})
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
rabbit-hat-0.0.1 lib/hat/worker.rb