lib/qsagi/confirmed_queue.rb in qsagi-0.1.1 vs lib/qsagi/confirmed_queue.rb in qsagi-0.1.2
- old
+ new
@@ -1,14 +1,17 @@
+require 'thread'
+
module Qsagi
class ConfirmedQueue
attr_reader :nacked_messages
def initialize(queue)
@queue = queue
@nacked_messages = []
@unconfirmed_messages = {}
@wait_for_confirms = false
+ @semaphore = Mutex.new
end
def connect
@queue.connect
_confirm_select
@@ -17,11 +20,14 @@
def disconnect
@queue.disconnect
end
def push(message)
- @unconfirmed_messages[_channel.next_publish_seq_no] = message
+ next_sequence_number = _channel.next_publish_seq_no
+ @semaphore.synchronize do
+ @unconfirmed_messages[next_sequence_number] = message
+ end
@queue.push(message)
@wait_for_confirms = true
end
def pop(opts={})
@@ -52,12 +58,16 @@
end
end
end
def _confirm_select
- _channel.confirm_select lambda { |delivery_tag, multiple, is_nack|
- _confirm_messages!(:delivery_tag => delivery_tag, :multiple => multiple, :is_nack => is_nack)
- }
+ callback = lambda do |delivery_tag, multiple, is_nack|
+ @semaphore.synchronize do
+ _confirm_messages!(:delivery_tag => delivery_tag, :multiple => multiple, :is_nack => is_nack)
+ end
+ end
+
+ _channel.confirm_select(callback)
end
def _wait_for_confirms?
@wait_for_confirms
end