Sha256: 465113cef14a2870ba0f82575140c0a3bf40977f3844f0e75d36655fa466b4f1

Contents?: true

Size: 1.95 KB

Versions: 21

Compression:

Stored size: 1.95 KB

Contents

#!/usr/bin/env ruby
# encoding: utf-8

require "rubygems"
require "bunny"

puts "=> Subscribing for messages using explicit acknowledgements model"
puts

connection1 = Bunny.new
connection1.start

connection2 = Bunny.new
connection2.start

connection3 = Bunny.new
connection3.start

ch1 = connection1.create_channel
ch1.prefetch(1)

ch2 = connection2.create_channel
ch2.prefetch(1)

ch3 = connection3.create_channel
ch3.prefetch(1)

x   = ch3.direct("amq.direct")
q1  = ch1.queue("bunny.examples.acknowledgements.explicit", :auto_delete => false)
q1.purge

q1.bind(x).subscribe(:manual_ack => true, :block => false) do |delivery_info, properties, payload|
  # do some work
  sleep(0.2)

  # acknowledge some messages, they will be removed from the queue
  if rand > 0.5
    # FYI: there is a shortcut, Bunny::Channel.ack
    ch1.acknowledge(delivery_info.delivery_tag, false)
    puts "[consumer1] Got message ##{properties.headers['i']}, redelivered?: #{delivery_info.redelivered?}, ack-ed"
  else
    # some messages are not ack-ed and will remain in the queue for redelivery
    # when app #1 connection is closed (either properly or due to a crash)
    puts "[consumer1] Got message ##{properties.headers['i']}, SKIPPED"
  end
end

q2   = ch2.queue("bunny.examples.acknowledgements.explicit", :auto_delete => false)
q2.bind(x).subscribe(:manual_ack => true, :block => false) do |delivery_info, properties, payload|
  # do some work
  sleep(0.2)

  ch2.acknowledge(delivery_info.delivery_tag, false)
  puts "[consumer2] Got message ##{properties.headers['i']}, redelivered?: #{delivery_info.redelivered?}, ack-ed"
end

t1 = Thread.new do
  i = 0
  loop do
    sleep 0.5

    x.publish("Message ##{i}", :headers => { :i => i })
    i += 1
  end
end
t1.abort_on_exception = true

t2 = Thread.new do
  sleep 4.0

  connection1.close
  puts "----- Connection 1 is now closed (we pretend that it has crashed) -----"
end
t2.abort_on_exception = true


sleep 7.0
connection2.close
connection3.close

Version data entries

21 entries across 21 versions & 1 rubygems

Version Path
bunny-2.2.2 examples/guides/queues/redeliveries.rb
bunny-2.2.1 examples/guides/queues/redeliveries.rb
bunny-2.2.0 examples/guides/queues/redeliveries.rb
bunny-1.7.1 examples/guides/queues/redeliveries.rb
bunny-2.1.0 examples/guides/queues/redeliveries.rb
bunny-2.0.1 examples/guides/queues/redeliveries.rb
bunny-2.0.0 examples/guides/queues/redeliveries.rb
bunny-2.0.0.rc2 examples/guides/queues/redeliveries.rb
bunny-2.0.0.rc1 examples/guides/queues/redeliveries.rb
bunny-1.7.0 examples/guides/queues/redeliveries.rb
bunny-1.6.3 examples/guides/queues/redeliveries.rb
bunny-1.6.2 examples/guides/queues/redeliveries.rb
bunny-1.6.1 examples/guides/queues/redeliveries.rb
bunny-1.6.0 examples/guides/queues/redeliveries.rb
bunny-1.5.1 examples/guides/queues/redeliveries.rb
bunny-1.6.0.rc2 examples/guides/queues/redeliveries.rb
bunny-1.6.0.rc1 examples/guides/queues/redeliveries.rb
bunny-1.6.0.pre1 examples/guides/queues/redeliveries.rb
bunny-1.5.0 examples/guides/queues/redeliveries.rb
bunny-1.5.0.pre2 examples/guides/queues/redeliveries.rb