Sha256: b8a3b729fa9a819b2073a3d32884761774f64761782b7a85263634b2b7785ce0

Contents?: true

Size: 1019 Bytes

Versions: 1

Compression:

Stored size: 1019 Bytes

Contents

class Sneakers::Queue
  attr_reader :name, :opts, :exchange

  def initialize(name, opts)
    @name = name
    @opts = opts
    @handler_klass = Sneakers::Config[:handler]
  end

  #
  # :exchange
  # :heartbeat_interval
  # :prefetch
  # :durable
  # :ack
  #
  def subscribe(worker)
    @bunny = Bunny.new(:heartbeat_interval => @opts[:heartbeat_interval])
    @bunny.start(@opts[:amqp])

    @channel = @bunny.create_channel
    @channel.prefetch(@opts[:prefetch])

    @exchange = @channel.exchange(@opts[:exchange], :type => :direct, :durable => @opts[:durable])
    handler = @handler_klass.new(@channel)

    queue = @channel.queue(@name, :durable => @opts[:durable])
    queue.bind(@exchange, :routing_key => @name)

    @consumer = queue.subscribe(:block => false, :ack => @opts[:ack]) do | hdr, props, msg | 
      worker.do_work(hdr, props, msg, handler)
    end
    nil
  end

  def unsubscribe
    # XXX can we cancel bunny and channel too?
    @consumer.cancel if @consumer
    @consumer = nil
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
sneakers-0.0.3 lib/sneakers/queue.rb