lib/amqp/client/channel.rb in amqp-client-1.0.2 vs lib/amqp/client/channel.rb in amqp-client-1.1.0

- old
+ new

@@ -213,10 +213,11 @@ # @param arguments [Hash] Arguments matching the binding that's being removed # @return [nil] def queue_unbind(name, exchange, binding_key, arguments: {}) write_bytes FrameBytes.queue_unbind(@id, name, exchange, binding_key, arguments) expect :queue_unbind_ok + nil end # @!endgroup # @!group Basic @@ -409,15 +410,14 @@ # Block until all publishes messages are confirmed # @return [Boolean] True if all message where positivly acknowledged, false if not def wait_for_confirms return true if @unconfirmed.empty? - case @unconfirmed_empty.pop - when true then true - when false then false - else raise Error::Closed.new(@id, *@closed) - end + ok = @unconfirmed_empty.pop + raise Error::Closed.new(@id, *@closed) if ok.nil? + + ok end # Called by Connection when received ack/nack from broker # @api private def confirm(args) @@ -431,13 +431,12 @@ rescue ThreadError break end return unless @unconfirmed.empty? - @unconfirmed_empty.num_waiting.times do - @unconfirmed_empty << (ack_or_nack == :ack) - end + ok = ack_or_nack == :ack + @unconfirmed_empty.push(ok) until @unconfirmed_empty.num_waiting.zero? end # @!endgroup # @!group Transaction @@ -472,16 +471,16 @@ @replies.push(args) end # @api private def message_returned(reply_code, reply_text, exchange, routing_key) - @next_msg = ReturnMessage.new(reply_code, reply_text, exchange, routing_key, nil, "") + @next_msg = ReturnMessage.new(reply_code, reply_text, exchange, routing_key) end # @api private def message_delivered(consumer_tag, delivery_tag, redelivered, exchange, routing_key) - @next_msg = Message.new(self, delivery_tag, exchange, routing_key, nil, "", redelivered, consumer_tag) + @next_msg = Message.new(self, consumer_tag, delivery_tag, exchange, routing_key, redelivered) end # @api private def basic_get_empty @basic_gets.push :basic_get_empty @@ -508,10 +507,11 @@ end # @api private def close_consumer(tag) @consumers.fetch(tag).close + nil end private def next_message_finished! @@ -526,9 +526,10 @@ @basic_gets.push next_msg else Thread.pass until (consumer = @consumers[next_msg.consumer_tag]) consumer.push next_msg end + nil ensure @next_msg = @next_body = @next_body_size = nil end def write_bytes(*bytes)