Sha256: 4530d1ced1231e85fd46a138e361011364fcdd3b6e7231d82e5c77d1ea714868

Contents?: true

Size: 1.03 KB

Versions: 5

Compression:

Stored size: 1.03 KB

Contents

#!/usr/bin/env ruby
# encoding: utf-8

require "rubygems"
require 'amqp'

def amqp_settings
  uri = URI.parse(ENV["AMQP_URL"] || 'amqp://guest:guest@localhost:5672/')
  {
    :vhost => uri.path,
    :host => uri.host,
    :user => uri.user,
    :port => uri.port || 5672,
    :pass => uri.password,
    :heartbeat => 120,
    :logging => false
  }
rescue Object => e
  raise "invalid AMQP_URL: (#{uri.inspect}) #{e.class} -> #{e.message}"
end

p amqp_settings

def log(*args)
  puts args.inspect
end

EM.run do
  puts "Running..."
  AMQP.start(amqp_settings) do |connection|
    log "Connected to AMQP broker"

    channel  = AMQP::Channel.new(connection)
    channel.prefetch(1)
    queue    = channel.queue("test.hello.world")
    exchange = channel.direct
    queue.bind(exchange)

    EM.defer do
      10000.times { |i|
        log "Publishing message #{i+1}"
        if i % 1000 == 0
          puts "Sleeping..."
          sleep(1)
        end
        exchange.publish "Hello, world! - #{i+1}"#, :routing_key => queue.name
      }
    end
  end  
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
cloudist-0.5.0 examples/amqp/amqp_publisher.rb
cloudist-0.4.4 examples/amqp/amqp_publisher.rb
cloudist-0.4.3 examples/amqp/amqp_publisher.rb
cloudist-0.4.2 examples/amqp/amqp_publisher.rb
cloudist-0.4.1 examples/amqp/amqp_publisher.rb