Sha256: c34c38346b72dd591a2a8b5629653fef58f66b41a82de7480c23dc6a1dc7b35c

Contents?: true

Size: 1.15 KB

Versions: 5

Compression:

Stored size: 1.15 KB

Contents

#!/usr/bin/env ruby
# encoding: utf-8

require "rubygems"
require 'amqp'

def amqp_settings
  uri = URI.parse(ENV["AMQP_URL"] || 'amqp://guest:guest@localhost:5672/')
  {
    :vhost => uri.path,
    :host => uri.host,
    :user => uri.user,
    :port => uri.port || 5672,
    :pass => uri.password,
    :heartbeat => 120,
    :logging => false
  }
rescue Object => e
  raise "invalid AMQP_URL: (#{uri.inspect}) #{e.class} -> #{e.message}"
end

p amqp_settings

def log(*args)
  puts args.inspect
end

EM.run do
  puts "Running..."
  AMQP.start(amqp_settings) do |connection|
    log "Connected to AMQP broker"

    channel  = AMQP::Channel.new(connection)
    channel.prefetch(1)
    queue    = channel.queue("test.hello.world")
    exchange = channel.direct
    queue.bind(exchange)
    
    @count = 0
    
    queue.subscribe(:ack => true) do |h, payload|
      puts "--"
      EM.defer {
        # sleep(1)
        @count += 1
        log "Received a message: #{payload} - #{@count}"
        h.ack
      }
    end
    
    # queue.subscribe(:ack => false) do |h, payload|
    #   @count += 1
    #   log "Received a message: #{payload} - #{@count}"
    # end
  end  
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
cloudist-0.5.0 examples/amqp/amqp_consumer.rb
cloudist-0.4.4 examples/amqp/amqp_consumer.rb
cloudist-0.4.3 examples/amqp/amqp_consumer.rb
cloudist-0.4.2 examples/amqp/amqp_consumer.rb
cloudist-0.4.1 examples/amqp/amqp_consumer.rb