lib/qsagi/confirmed_queue.rb in qsagi-0.1.1 vs lib/qsagi/confirmed_queue.rb in qsagi-0.1.2

- old
+ new

@@ -1,14 +1,17 @@ +require 'thread' + module Qsagi class ConfirmedQueue attr_reader :nacked_messages def initialize(queue) @queue = queue @nacked_messages = [] @unconfirmed_messages = {} @wait_for_confirms = false + @semaphore = Mutex.new end def connect @queue.connect _confirm_select @@ -17,11 +20,14 @@ def disconnect @queue.disconnect end def push(message) - @unconfirmed_messages[_channel.next_publish_seq_no] = message + next_sequence_number = _channel.next_publish_seq_no + @semaphore.synchronize do + @unconfirmed_messages[next_sequence_number] = message + end @queue.push(message) @wait_for_confirms = true end def pop(opts={}) @@ -52,12 +58,16 @@ end end end def _confirm_select - _channel.confirm_select lambda { |delivery_tag, multiple, is_nack| - _confirm_messages!(:delivery_tag => delivery_tag, :multiple => multiple, :is_nack => is_nack) - } + callback = lambda do |delivery_tag, multiple, is_nack| + @semaphore.synchronize do + _confirm_messages!(:delivery_tag => delivery_tag, :multiple => multiple, :is_nack => is_nack) + end + end + + _channel.confirm_select(callback) end def _wait_for_confirms? @wait_for_confirms end