Sha256: 87499e4ca1ce83c5028caf411016226c4983f9262529cbbcd653dc391801b3ab

Contents?: true

Size: 1.94 KB

Versions: 35

Compression:

Stored size: 1.94 KB

Contents

#!/usr/bin/env ruby
# encoding: utf-8

require "rubygems"
require "bunny"

puts "=> Subscribing for messages using explicit acknowledgements model"
puts

connection1 = Bunny.new
connection1.start

connection2 = Bunny.new
connection2.start

connection3 = Bunny.new
connection3.start

ch1 = connection1.create_channel
ch1.prefetch(1)

ch2 = connection2.create_channel
ch2.prefetch(1)

ch3 = connection3.create_channel
ch3.prefetch(1)

x   = ch3.direct("amq.direct")
q1  = ch1.queue("bunny.examples.acknowledgements.explicit", :auto_delete => false)
q1.purge

q1.bind(x).subscribe(:ack => true, :block => false) do |delivery_info, properties, payload|
  # do some work
  sleep(0.2)

  # acknowledge some messages, they will be removed from the queue
  if rand > 0.5
    # FYI: there is a shortcut, Bunny::Channel.ack
    ch1.acknowledge(delivery_info.delivery_tag, false)
    puts "[consumer1] Got message ##{properties.headers['i']}, redelivered?: #{delivery_info.redelivered?}, ack-ed"
  else
    # some messages are not ack-ed and will remain in the queue for redelivery
    # when app #1 connection is closed (either properly or due to a crash)
    puts "[consumer1] Got message ##{properties.headers['i']}, SKIPPPED"
  end
end

q2   = ch2.queue("bunny.examples.acknowledgements.explicit", :auto_delete => false)
q2.bind(x).subscribe(:ack => true, :block => false) do |delivery_info, properties, payload|
  # do some work
  sleep(0.2)

  ch2.acknowledge(delivery_info.delivery_tag, false)
  puts "[consumer2] Got message ##{properties.headers['i']}, redelivered?: #{delivery_info.redelivered?}, ack-ed"
end

t1 = Thread.new do
  i = 0
  loop do
    sleep 0.5

    x.publish("Message ##{i}", :headers => { :i => i })
    i += 1
  end
end
t1.abort_on_exception = true

t2 = Thread.new do
  sleep 4.0

  connection1.close
  puts "----- Connection 1 is now closed (we pretend that it has crashed) -----"
end
t2.abort_on_exception = true


sleep 7.0
connection2.close
connection3.close

Version data entries

35 entries across 35 versions & 1 rubygems

Version Path
bunny-0.10.8 examples/guides/queues/redeliveries.rb
bunny-0.10.7 examples/guides/queues/redeliveries.rb
bunny-0.10.6 examples/guides/queues/redeliveries.rb
bunny-0.10.5 examples/guides/queues/redeliveries.rb
bunny-1.0.0.pre5 examples/guides/queues/redeliveries.rb
bunny-0.10.4 examples/guides/queues/redeliveries.rb
bunny-0.10.3 examples/guides/queues/redeliveries.rb
bunny-1.0.0.pre4 examples/guides/queues/redeliveries.rb
bunny-0.10.2 examples/guides/queues/redeliveries.rb
bunny-0.10.1 examples/guides/queues/redeliveries.rb
bunny-0.10.0 examples/guides/queues/redeliveries.rb
bunny-1.0.0.pre3 examples/guides/queues/redeliveries.rb
bunny-0.9.8 examples/guides/queues/redeliveries.rb
bunny-1.0.0.pre2 examples/guides/queues/redeliveries.rb
bunny-0.9.7 examples/guides/queues/redeliveries.rb
bunny-0.9.6 examples/guides/queues/redeliveries.rb
bunny-0.9.5 examples/guides/queues/redeliveries.rb
bunny-0.9.4 examples/guides/queues/redeliveries.rb
bunny-1.0.0.pre1 examples/guides/queues/redeliveries.rb
bunny-0.9.3 examples/guides/queues/redeliveries.rb