Sha256: b8890fa70342e96c8c034c98236d61308ff3c491c4e487446f56fd171e1aba3f

Contents?: true

Size: 1.44 KB

Versions: 4

Compression:

Stored size: 1.44 KB

Contents

require 'carrot'
module MessageQueue
  class Rabbit < Base

    def initialize(opts={})
      @servers     = opts['servers']
      @host, @port = @servers.first.split(':')
      @port        = @port.to_i
      @opts        = opts
    end

    def delete(queue)
      send_command do
        client.queue(queue).delete
      end
    end

    def queue_size(queue)
      send_command do
        client.queue(queue).message_count
      end
    end

    def enqueue(queue, data)
      send_command do 
        client.queue(queue, :durable => true).publish(Marshal.dump(data), :persistent => true)
      end
    end

    def dequeue(queue)
      send_command do
        task = client.queue(queue).pop(:ack => true)
        return unless task
        Marshal.load(task)
      end
    end

    def confirm(queue)
      send_command do
        client.queue(queue).ack
      end
    end

    def send_command(&block)
      retried = false
      begin
        block.call
      rescue Carrot::AMQP::Server::ServerDown => e
        if not retried
          puts "Error #{e.message}. Retrying..."
          @client = nil
          retried = true
          retry
        else
          raise e
        end
      end
    end

    def client
      @client ||= Carrot.new(
        :host  => @host, 
        :port  => @port, 
        :user  => @opts['user'], 
        :pass  => @opts['pass'], 
        :vhost => @opts['vhost']
      ) 
    end

    def stop
      client.stop
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
famoseagle-sweat_shop-0.7.0 lib/message_queue/rabbit.rb
famoseagle-sweat_shop-0.8.0 lib/message_queue/rabbit.rb
famoseagle-sweat_shop-0.8.1 lib/message_queue/rabbit.rb
famoseagle-sweat_shop-0.8.2 lib/message_queue/rabbit.rb