lib/amqp/client/channel.rb in amqp-client-0.2.3 vs lib/amqp/client/channel.rb in amqp-client-0.3.0

- old
+ new

@@ -4,19 +4,20 @@ module AMQP # AMQP Channel class Channel def initialize(connection, id) - @replies = ::Queue.new @connection = connection @id = id + @replies = ::Queue.new @consumers = {} - @confirm = nil - @last_confirmed = 0 @closed = nil - @on_return = nil @open = false + @on_return = nil + @confirm = nil + @unconfirmed = ::Queue.new + @unconfirmed_empty = ::Queue.new end attr_reader :id, :consumers def open @@ -122,34 +123,37 @@ end def basic_publish(body, exchange, routing_key, **properties) frame_max = @connection.frame_max - 8 id = @id + mandatory = properties.delete(:mandatory) || false if 0 < body.bytesize && body.bytesize <= frame_max - write_bytes FrameBytes.basic_publish(id, exchange, routing_key, properties.delete(:mandatory) || false), + write_bytes FrameBytes.basic_publish(id, exchange, routing_key, mandatory), FrameBytes.header(id, body.bytesize, properties), FrameBytes.body(id, body) - return @confirm ? @confirm += 1 : nil + @unconfirmed.push @confirm += 1 if @confirm + return end - write_bytes FrameBytes.basic_publish(id, exchange, routing_key, properties.delete(:mandatory) || false), + write_bytes FrameBytes.basic_publish(id, exchange, routing_key, mandatory), FrameBytes.header(id, body.bytesize, properties) pos = 0 while pos < body.bytesize # split body into multiple frame_max frames len = [frame_max, body.bytesize - pos].min body_part = body.byteslice(pos, len) write_bytes FrameBytes.body(id, body_part) pos += len end - @confirm += 1 if @confirm + @unconfirmed.push @confirm += 1 if @confirm + nil end def basic_publish_confirm(body, exchange, routing_key, **properties) confirm_select(no_wait: true) - id = basic_publish(body, exchange, routing_key, **properties) - wait_for_confirm(id) + basic_publish(body, exchange, routing_key, **properties) + wait_for_confirms end # Consume from a queue # worker_threads: 0 => blocking, messages are executed in the thread calling this method def basic_consume(queue, tag: "", no_ack: true, exclusive: false, arguments: {}, @@ -209,24 +213,36 @@ def confirm_select(no_wait: false) return if @confirm write_bytes FrameBytes.confirm_select(@id, no_wait) expect :confirm_select_ok unless no_wait - @confirms = ::Queue.new @confirm = 0 end - def wait_for_confirm(id) - raise ArgumentError, "Confirm id has to a positive number" unless id&.positive? - return true if @last_confirmed >= id + # Block until all publishes messages are confirmed + def wait_for_confirms + return true if @unconfirmed.empty? + @unconfirmed_empty.pop + end + + def confirm(args) + ack_or_nack, delivery_tag, multiple = *args loop do - ack, delivery_tag, multiple = @confirms.shift || break - @last_confirmed = delivery_tag - return ack if delivery_tag == id || (delivery_tag > id && multiple) + tag = @unconfirmed.pop(true) + break if tag == delivery_tag + next if multiple && tag < delivery_tag + + @unconfirmed << tag # requeue + rescue ThreadError + break end - false + return unless @unconfirmed.empty? + + @unconfirmed_empty.num_waiting.times do + @unconfirmed_empty << ack_or_nack == :ack + end end def tx_select write_bytes FrameBytes.tx_select(@id) expect :tx_select_ok @@ -242,13 +258,9 @@ expect :tx_rollback_ok end def reply(args) @replies.push(args) - end - - def confirm(args) - @confirms.push(args) end def message_returned(reply_code, reply_text, exchange, routing_key) Thread.new do body_size, properties = expect(:header)