Sha256: c34c38346b72dd591a2a8b5629653fef58f66b41a82de7480c23dc6a1dc7b35c
Contents?: true
Size: 1.15 KB
Versions: 5
Compression:
Stored size: 1.15 KB
Contents
#!/usr/bin/env ruby # encoding: utf-8 require "rubygems" require 'amqp' def amqp_settings uri = URI.parse(ENV["AMQP_URL"] || 'amqp://guest:guest@localhost:5672/') { :vhost => uri.path, :host => uri.host, :user => uri.user, :port => uri.port || 5672, :pass => uri.password, :heartbeat => 120, :logging => false } rescue Object => e raise "invalid AMQP_URL: (#{uri.inspect}) #{e.class} -> #{e.message}" end p amqp_settings def log(*args) puts args.inspect end EM.run do puts "Running..." AMQP.start(amqp_settings) do |connection| log "Connected to AMQP broker" channel = AMQP::Channel.new(connection) channel.prefetch(1) queue = channel.queue("test.hello.world") exchange = channel.direct queue.bind(exchange) @count = 0 queue.subscribe(:ack => true) do |h, payload| puts "--" EM.defer { # sleep(1) @count += 1 log "Received a message: #{payload} - #{@count}" h.ack } end # queue.subscribe(:ack => false) do |h, payload| # @count += 1 # log "Received a message: #{payload} - #{@count}" # end end end
Version data entries
5 entries across 5 versions & 1 rubygems