Sha256: 7d5684f33c2211524b4a520d3208d68b38f301e09f6863961f9146fbc7b4cc39

Contents?: true

Size: 1.82 KB

Versions: 36

Compression:

Stored size: 1.82 KB

Contents

#!/usr/bin/env ruby
# encoding: utf-8

require "bundler"
Bundler.setup
Bundler.require :default

$:.unshift(File.expand_path("../../../lib", __FILE__))
require 'amqp'

puts "=> Clock example"
puts
AMQP.start(:host => 'localhost') do |connection|
  puts "Connected!"

  def log(*args)
    p args
  end

  # AMQP.logging = true

  channel = AMQP::Channel.new(connection)
  puts "Channel #{channel.id} is now open"
  producer   = channel.fanout('clock')
  EM.add_periodic_timer(1) {
    puts

    log :publishing, time = Time.now
    producer.publish(Marshal.dump(time))
  }

  channel2 = AMQP::Channel.new(connection)
  exchange = channel2.fanout('clock')

  q1       = channel2.queue('every second')
  q1.bind(exchange).subscribe(:confirm => proc { puts "Subscribed!" }) { |time|
      log 'every second', :received, Marshal.load(time)
  }

  puts "channel #{channel2.id} consumer tags: #{channel2.consumers.keys.join(', ')}"

  # channel3 = AMQP::Channel.new
  channel3 = AMQP::Channel.new(connection)
  q2 = channel3.queue('every 5 seconds')
  q2.bind(exchange).subscribe { |time|
    time = Marshal.load(time)
    log 'every 5 seconds', :received, time if time.strftime('%S').to_i % 5 == 0
  }

  show_stopper = Proc.new {
    q1.unbind(exchange)
    q2.unbind(exchange) do
      puts "Unbound #{q2.name}."

      q1.purge do |message_count|
        puts "Purged #{q1.name}, there were #{message_count} messages"
        puts "Deleting #{q1.name}…"
        q1.delete(:if_empty => true, :nowait => true)
      end

      q2.delete do |message_count|
        puts "Deleted #{q2.name}. There were #{message_count} messages"
      end

      puts " About to close AMQP connection…"
      connection.close { exit! } unless connection.closing?
    end
  }

  Signal.trap "INT",  show_stopper
  Signal.trap "TERM", show_stopper

  EM.add_timer(7, show_stopper)
end

Version data entries

36 entries across 36 versions & 1 rubygems

Version Path
amqp-0.9.10 examples/legacy/clock.rb
amqp-0.9.9 examples/legacy/clock.rb
amqp-0.9.8 examples/legacy/clock.rb
amqp-0.9.7 examples/legacy/clock.rb
amqp-0.9.6 examples/legacy/clock.rb
amqp-0.9.5 examples/legacy/clock.rb
amqp-0.9.4 examples/legacy/clock.rb
amqp-0.9.3 examples/legacy/clock.rb
amqp-0.9.2 examples/legacy/clock.rb
amqp-0.9.1 examples/legacy/clock.rb
amqp-0.9.0 examples/legacy/clock.rb
amqp-0.9.0.pre3 examples/legacy/clock.rb
amqp-0.9.0.pre2 examples/legacy/clock.rb
amqp-0.9.0.pre1 examples/legacy/clock.rb
amqp-0.8.4 examples/legacy/clock.rb
amqp-0.8.3 examples/legacy/clock.rb
amqp-0.8.2 examples/legacy/clock.rb
amqp-0.8.1 examples/legacy/clock.rb
amqp-0.8.0 examples/legacy/clock.rb
amqp-0.8.0.rc15 examples/legacy/clock.rb