lib/amqp/client/channel.rb in amqp-client-1.1.5 vs lib/amqp/client/channel.rb in amqp-client-1.1.6

- old
+ new

@@ -20,12 +20,13 @@ @consumers = {} @closed = nil @open = false @on_return = nil @confirm = nil - @unconfirmed = ::Queue.new - @unconfirmed_empty = ::Queue.new + @unconfirmed = [] + @unconfirmed_lock = Mutex.new + @unconfirmed_empty = ConditionVariable.new @basic_gets = ::Queue.new end # Override #inspect # @api private @@ -58,11 +59,11 @@ write_bytes FrameBytes.channel_close(@id, reason, code) @closed = [:channel, code, reason] expect :channel_close_ok @replies.close @basic_gets.close - @unconfirmed_empty.close + @unconfirmed_lock.synchronize { @unconfirmed_empty.broadcast } @consumers.each_value(&:close) nil end # Called when channel is closed by broker @@ -71,11 +72,11 @@ # @api private def closed!(level, code, reason, classid, methodid) @closed = [level, code, reason, classid, methodid] @replies.close @basic_gets.close - @unconfirmed_empty.close + @unconfirmed_lock.synchronize { @unconfirmed_empty.broadcast } @consumers.each_value(&:close) @consumers.each_value(&:clear) # empty the queues too, messages can't be acked anymore nil end @@ -265,16 +266,19 @@ mandatory = properties.delete(:mandatory) || false case properties.delete(:persistent) when true then properties[:delivery_mode] = 2 when false then properties[:delivery_mode] = 1 end - + if @confirm + @unconfirmed_lock.synchronize do + @unconfirmed.push @confirm += 1 + end + end if body.bytesize.between?(1, body_max) write_bytes FrameBytes.basic_publish(id, exchange, routing_key, mandatory), FrameBytes.header(id, body.bytesize, properties), FrameBytes.body(id, body) - @unconfirmed.push @confirm += 1 if @confirm return end write_bytes FrameBytes.basic_publish(id, exchange, routing_key, mandatory), FrameBytes.header(id, body.bytesize, properties) @@ -283,11 +287,10 @@ len = [body_max, body.bytesize - pos].min body_part = body.byteslice(pos, len) write_bytes FrameBytes.body(id, body_part) pos += len end - @unconfirmed.push @confirm += 1 if @confirm nil end # Publish a message and block until the message has confirmed it has received it # @param (see #basic_publish) @@ -394,45 +397,47 @@ # Put the channel in confirm mode, each published message will then be confirmed by the broker # @param no_wait [Boolean] If false the method will block until the broker has confirmed the request # @return [nil] def confirm_select(no_wait: false) - return if @confirm + return if @confirm # fast path - write_bytes FrameBytes.confirm_select(@id, no_wait) - expect :confirm_select_ok unless no_wait - @confirm = 0 + @unconfirmed_lock.synchronize do + # check again in case another thread already did this while we waited for the lock + return if @confirm + + write_bytes FrameBytes.confirm_select(@id, no_wait) + expect :confirm_select_ok unless no_wait + @confirm = 0 + end nil end # Block until all publishes messages are confirmed - # @return [Boolean] True if all message where positivly acknowledged, false if not + # @return nil def wait_for_confirms - return true if @unconfirmed.empty? - - ok = @unconfirmed_empty.pop - raise Error::Closed.new(@id, *@closed) if ok.nil? - - ok + @unconfirmed_lock.synchronize do + until @unconfirmed.empty? + @unconfirmed_empty.wait(@unconfirmed_lock) + raise Error::Closed.new(@id, *@closed) if @closed + end + end end # Called by Connection when received ack/nack from broker # @api private def confirm(args) - ack_or_nack, delivery_tag, multiple = *args - loop do - tag = @unconfirmed.pop(true) - break if tag == delivery_tag - next if multiple && tag < delivery_tag - - @unconfirmed << tag # requeue - rescue ThreadError - break + _ack_or_nack, delivery_tag, multiple = *args + @unconfirmed_lock.synchronize do + case multiple + when true + idx = @unconfirmed.index(delivery_tag) || raise("Delivery tag not found") + @unconfirmed.shift(idx + 1) + when false + @unconfirmed.delete(delivery_tag) || raise("Delivery tag not found") + end + @unconfirmed_empty.broadcast if @unconfirmed.empty? end - return unless @unconfirmed.empty? - - ok = ack_or_nack == :ack - @unconfirmed_empty.push(ok) until @unconfirmed_empty.num_waiting.zero? end # @!endgroup # @!group Transaction