lib/amqp/client/channel.rb in amqp-client-1.1.5 vs lib/amqp/client/channel.rb in amqp-client-1.1.6
- old
+ new
@@ -20,12 +20,13 @@
@consumers = {}
@closed = nil
@open = false
@on_return = nil
@confirm = nil
- @unconfirmed = ::Queue.new
- @unconfirmed_empty = ::Queue.new
+ @unconfirmed = []
+ @unconfirmed_lock = Mutex.new
+ @unconfirmed_empty = ConditionVariable.new
@basic_gets = ::Queue.new
end
# Override #inspect
# @api private
@@ -58,11 +59,11 @@
write_bytes FrameBytes.channel_close(@id, reason, code)
@closed = [:channel, code, reason]
expect :channel_close_ok
@replies.close
@basic_gets.close
- @unconfirmed_empty.close
+ @unconfirmed_lock.synchronize { @unconfirmed_empty.broadcast }
@consumers.each_value(&:close)
nil
end
# Called when channel is closed by broker
@@ -71,11 +72,11 @@
# @api private
def closed!(level, code, reason, classid, methodid)
@closed = [level, code, reason, classid, methodid]
@replies.close
@basic_gets.close
- @unconfirmed_empty.close
+ @unconfirmed_lock.synchronize { @unconfirmed_empty.broadcast }
@consumers.each_value(&:close)
@consumers.each_value(&:clear) # empty the queues too, messages can't be acked anymore
nil
end
@@ -265,16 +266,19 @@
mandatory = properties.delete(:mandatory) || false
case properties.delete(:persistent)
when true then properties[:delivery_mode] = 2
when false then properties[:delivery_mode] = 1
end
-
+ if @confirm
+ @unconfirmed_lock.synchronize do
+ @unconfirmed.push @confirm += 1
+ end
+ end
if body.bytesize.between?(1, body_max)
write_bytes FrameBytes.basic_publish(id, exchange, routing_key, mandatory),
FrameBytes.header(id, body.bytesize, properties),
FrameBytes.body(id, body)
- @unconfirmed.push @confirm += 1 if @confirm
return
end
write_bytes FrameBytes.basic_publish(id, exchange, routing_key, mandatory),
FrameBytes.header(id, body.bytesize, properties)
@@ -283,11 +287,10 @@
len = [body_max, body.bytesize - pos].min
body_part = body.byteslice(pos, len)
write_bytes FrameBytes.body(id, body_part)
pos += len
end
- @unconfirmed.push @confirm += 1 if @confirm
nil
end
# Publish a message and block until the message has confirmed it has received it
# @param (see #basic_publish)
@@ -394,45 +397,47 @@
# Put the channel in confirm mode, each published message will then be confirmed by the broker
# @param no_wait [Boolean] If false the method will block until the broker has confirmed the request
# @return [nil]
def confirm_select(no_wait: false)
- return if @confirm
+ return if @confirm # fast path
- write_bytes FrameBytes.confirm_select(@id, no_wait)
- expect :confirm_select_ok unless no_wait
- @confirm = 0
+ @unconfirmed_lock.synchronize do
+ # check again in case another thread already did this while we waited for the lock
+ return if @confirm
+
+ write_bytes FrameBytes.confirm_select(@id, no_wait)
+ expect :confirm_select_ok unless no_wait
+ @confirm = 0
+ end
nil
end
# Block until all publishes messages are confirmed
- # @return [Boolean] True if all message where positivly acknowledged, false if not
+ # @return nil
def wait_for_confirms
- return true if @unconfirmed.empty?
-
- ok = @unconfirmed_empty.pop
- raise Error::Closed.new(@id, *@closed) if ok.nil?
-
- ok
+ @unconfirmed_lock.synchronize do
+ until @unconfirmed.empty?
+ @unconfirmed_empty.wait(@unconfirmed_lock)
+ raise Error::Closed.new(@id, *@closed) if @closed
+ end
+ end
end
# Called by Connection when received ack/nack from broker
# @api private
def confirm(args)
- ack_or_nack, delivery_tag, multiple = *args
- loop do
- tag = @unconfirmed.pop(true)
- break if tag == delivery_tag
- next if multiple && tag < delivery_tag
-
- @unconfirmed << tag # requeue
- rescue ThreadError
- break
+ _ack_or_nack, delivery_tag, multiple = *args
+ @unconfirmed_lock.synchronize do
+ case multiple
+ when true
+ idx = @unconfirmed.index(delivery_tag) || raise("Delivery tag not found")
+ @unconfirmed.shift(idx + 1)
+ when false
+ @unconfirmed.delete(delivery_tag) || raise("Delivery tag not found")
+ end
+ @unconfirmed_empty.broadcast if @unconfirmed.empty?
end
- return unless @unconfirmed.empty?
-
- ok = ack_or_nack == :ack
- @unconfirmed_empty.push(ok) until @unconfirmed_empty.num_waiting.zero?
end
# @!endgroup
# @!group Transaction