Sha256: 938ba1ad298fecd90e1438e2244a4e17eca7717c4dcea1349d634428ce03bb3d

Contents?: true

Size: 1.03 KB

Versions: 1

Compression:

Stored size: 1.03 KB

Contents

require 'bunny'
require 'connection_pool'
require 'woodhouse/dispatchers/common_amqp_dispatcher'

class Woodhouse::Dispatchers::BunnyDispatcher < Woodhouse::Dispatchers::CommonAmqpDispatcher

  def initialize(config, opts = {}, &blk)
    super
    @pool = new_pool
  end

  private

  def publish_job(job, exchange)
    exchange.publish(job.payload, :headers => job.arguments)
  end

  def run
    retried = false
    @pool.with do |conn|
      yield conn
    end
  rescue Bunny::ClientTimeout => err
    if retried
      raise Woodhouse::ConnectionError, "timed out while contacting AMQP server: #{err.message}"
    else
      new_pool!
      retried = true
      retry
    end
  end

  private

  def new_pool!
    @pool = new_pool
  end

  def new_pool
    @bunny.stop if @bunny

    bunny = @bunny = Bunny.new(@config.server_info || {})
    @bunny.start

    ConnectionPool.new { bunny.create_channel }
  rescue Bunny::TCPConnectionFailed => err
    raise Woodhouse::ConnectionError, "unable to connect to AMQP server: #{err.message}"
  end

end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
woodhouse-1.0.0 lib/woodhouse/dispatchers/bunny_dispatcher.rb