lib/amqp/client/channel.rb in amqp-client-0.3.0 vs lib/amqp/client/channel.rb in amqp-client-1.0.0

- old
+ new

@@ -14,122 +14,123 @@ @open = false @on_return = nil @confirm = nil @unconfirmed = ::Queue.new @unconfirmed_empty = ::Queue.new + @basic_gets = ::Queue.new end - attr_reader :id, :consumers + def inspect + "#<#{self.class} @id=#{@id} @open=#{@open} @closed=#{@closed} confirm_selected=#{!@confirm.nil?}"\ + " consumer_count=#{@consumers.size} replies_count=#{@replies.size} unconfirmed_count=#{@unconfirmed.size}>" + end + attr_reader :id + def open return self if @open + @open = true write_bytes FrameBytes.channel_open(@id) expect(:channel_open_ok) - @open = true self end def close(reason = "", code = 200) return if @closed write_bytes FrameBytes.channel_close(@id, reason, code) - expect :channel_close_ok @closed = [code, reason] + expect :channel_close_ok + @replies.close + @basic_gets.close + @unconfirmed_empty.close + @consumers.each_value(&:close) end - # Called when closed by server + # Called when channel is closed by server def closed!(code, reason, classid, methodid) - write_bytes FrameBytes.channel_close_ok(@id) @closed = [code, reason, classid, methodid] @replies.close - @consumers.each { |_, q| q.close } - @consumers.clear + @basic_gets.close + @unconfirmed_empty.close + @consumers.each_value(&:close) end - def exchange_declare(name, type, passive: false, durable: true, auto_delete: false, internal: false, **args) - write_bytes FrameBytes.exchange_declare(@id, name, type, passive, durable, auto_delete, internal, args) + def exchange_declare(name, type, passive: false, durable: true, auto_delete: false, internal: false, arguments: {}) + write_bytes FrameBytes.exchange_declare(@id, name, type, passive, durable, auto_delete, internal, arguments) expect :exchange_declare_ok end def exchange_delete(name, if_unused: false, no_wait: false) write_bytes FrameBytes.exchange_delete(@id, name, if_unused, no_wait) expect :exchange_delete_ok end - def exchange_bind(destination, source, binding_key, arguments = {}) + def exchange_bind(destination, source, binding_key, arguments: {}) write_bytes FrameBytes.exchange_bind(@id, destination, source, binding_key, false, arguments) expect :exchange_bind_ok end - def exchange_unbind(destination, source, binding_key, arguments = {}) + def exchange_unbind(destination, source, binding_key, arguments: {}) write_bytes FrameBytes.exchange_unbind(@id, destination, source, binding_key, false, arguments) expect :exchange_unbind_ok end + QueueOk = Struct.new(:queue_name, :message_count, :consumer_count) + def queue_declare(name = "", passive: false, durable: true, exclusive: false, auto_delete: false, arguments: {}) durable = false if name.empty? exclusive = true if name.empty? auto_delete = true if name.empty? write_bytes FrameBytes.queue_declare(@id, name, passive, durable, exclusive, auto_delete, arguments) name, message_count, consumer_count = expect(:queue_declare_ok) - { - queue_name: name, - message_count: message_count, - consumer_count: consumer_count - } + + QueueOk.new(name, message_count, consumer_count) end def queue_delete(name, if_unused: false, if_empty: false, no_wait: false) write_bytes FrameBytes.queue_delete(@id, name, if_unused, if_empty, no_wait) message_count, = expect :queue_delete message_count end - def queue_bind(name, exchange, binding_key, arguments = {}) + def queue_bind(name, exchange, binding_key, arguments: {}) write_bytes FrameBytes.queue_bind(@id, name, exchange, binding_key, false, arguments) expect :queue_bind_ok end def queue_purge(name, no_wait: false) write_bytes FrameBytes.queue_purge(@id, name, no_wait) expect :queue_purge_ok unless no_wait end - def queue_unbind(name, exchange, binding_key, arguments = {}) + def queue_unbind(name, exchange, binding_key, arguments: {}) write_bytes FrameBytes.queue_unbind(@id, name, exchange, binding_key, arguments) expect :queue_unbind_ok end def basic_get(queue_name, no_ack: true) write_bytes FrameBytes.basic_get(@id, queue_name, no_ack) - frame, *rest = @replies.shift - case frame - when :basic_get_ok - delivery_tag, exchange_name, routing_key, _message_count, redelivered = rest - body_size, properties = expect(:header) - pos = 0 - body = String.new("", capacity: body_size) - while pos < body_size - body_part, = expect(:body) - body += body_part - pos += body_part.bytesize - end - Message.new(self, delivery_tag, exchange_name, routing_key, properties, body, redelivered) + case (msg = @basic_gets.pop) + when Message then msg when :basic_get_empty then nil when nil then raise AMQP::Client::ChannelClosedError.new(@id, *@closed) - else raise AMQP::Client::UnexpectedFrame.new(%i[basic_get_ok basic_get_empty], frame) end end def basic_publish(body, exchange, routing_key, **properties) frame_max = @connection.frame_max - 8 id = @id mandatory = properties.delete(:mandatory) || false + case properties.delete(:persistent) + when true then properties[:delivery_mode] = 2 + when false then properties[:delivery_mode] = 1 + end - if 0 < body.bytesize && body.bytesize <= frame_max + if body.bytesize.between?(1, frame_max) write_bytes FrameBytes.basic_publish(id, exchange, routing_key, mandatory), FrameBytes.header(id, body.bytesize, properties), FrameBytes.body(id, body) @unconfirmed.push @confirm += 1 if @confirm return @@ -154,26 +155,23 @@ wait_for_confirms end # Consume from a queue # worker_threads: 0 => blocking, messages are executed in the thread calling this method - def basic_consume(queue, tag: "", no_ack: true, exclusive: false, arguments: {}, - worker_threads: 1) + def basic_consume(queue, tag: "", no_ack: true, exclusive: false, arguments: {}, worker_threads: 1) write_bytes FrameBytes.basic_consume(@id, queue, tag, no_ack, exclusive, arguments) tag, = expect(:basic_consume_ok) q = @consumers[tag] = ::Queue.new - msgs = ::Queue.new - Thread.new { recv_deliveries(tag, q, msgs) } if worker_threads.zero? - while (msg = msgs.shift) - yield msg + loop do + yield (q.pop || break) end else threads = Array.new(worker_threads) do Thread.new do - while (msg = msgs.shift) - yield(msg) + loop do + yield (q.pop || break) end end end [tag, threads] end @@ -220,13 +218,18 @@ # Block until all publishes messages are confirmed def wait_for_confirms return true if @unconfirmed.empty? - @unconfirmed_empty.pop + case @unconfirmed_empty.pop + when true then true + when false then false + else raise AMQP::Client::ChannelClosedError.new(@id, *@closed) + end end + # Called by Connection when received ack/nack from server def confirm(args) ack_or_nack, delivery_tag, multiple = *args loop do tag = @unconfirmed.pop(true) break if tag == delivery_tag @@ -237,11 +240,11 @@ break end return unless @unconfirmed.empty? @unconfirmed_empty.num_waiting.times do - @unconfirmed_empty << ack_or_nack == :ack + @unconfirmed_empty << (ack_or_nack == :ack) end end def tx_select write_bytes FrameBytes.tx_select(@id) @@ -256,65 +259,82 @@ def tx_rollback write_bytes FrameBytes.tx_rollback(@id) expect :tx_rollback_ok end + def on_return(&block) + @on_return = block + end + def reply(args) @replies.push(args) end def message_returned(reply_code, reply_text, exchange, routing_key) - Thread.new do - body_size, properties = expect(:header) - body = String.new("", capacity: body_size) - while body.bytesize < body_size - body_part, = expect(:body) - body += body_part - end - msg = ReturnMessage.new(reply_code, reply_text, exchange, routing_key, properties, body) + @next_msg = ReturnMessage.new(reply_code, reply_text, exchange, routing_key, nil, "") + end - if @on_return - @on_return.call(msg) - else - puts "[WARN] Message returned: #{msg.inspect}" - end + def message_delivered(consumer_tag, delivery_tag, redelivered, exchange, routing_key) + @next_msg = Message.new(self, delivery_tag, exchange, routing_key, nil, "", redelivered, consumer_tag) + end + + def basic_get_empty + @basic_gets.push :basic_get_empty + end + + def header_delivered(body_size, properties) + @next_msg.properties = properties + if body_size.zero? + next_message_finished! + else + @next_body = StringIO.new(String.new(capacity: body_size)) + @next_body_size = body_size end end - def on_return(&block) - @on_return = block + def body_delivered(body_part) + @next_body.write(body_part) + return unless @next_body.pos == @next_body_size + + @next_msg.body = @next_body.string + next_message_finished! end + def close_consumer(tag) + @consumers.fetch(tag).close + end + private - def recv_deliveries(consumer_tag, deliver_queue, msgs) - loop do - _, delivery_tag, redelivered, exchange, routing_key = deliver_queue.shift || raise(ClosedQueueError) - body_size, properties = expect(:header) - body = String.new("", capacity: body_size) - while body.bytesize < body_size - body_part, = expect(:body) - body += body_part + def next_message_finished! + next_msg = @next_msg + if next_msg.is_a? ReturnMessage + if @on_return + Thread.new { @on_return.call(next_msg) } + else + warn "AMQP-Client message returned: #{msg.inspect}" end - msgs.push Message.new(self, delivery_tag, exchange, routing_key, properties, body, redelivered, consumer_tag) + elsif next_msg.consumer_tag.nil? + @basic_gets.push next_msg + else + Thread.pass until (consumer = @consumers[next_msg.consumer_tag]) + consumer.push next_msg end ensure - msgs.close + @next_msg = @next_body = @next_body_size = nil end def write_bytes(*bytes) raise AMQP::Client::ChannelClosedError.new(@id, *@closed) if @closed @connection.write_bytes(*bytes) end def expect(expected_frame_type) - loop do - frame_type, *args = @replies.shift - raise AMQP::Client::ChannelClosedError.new(@id, *@closed) if frame_type.nil? - return args if frame_type == expected_frame_type + frame_type, *args = @replies.pop + raise AMQP::Client::ChannelClosedError.new(@id, *@closed) if frame_type.nil? + raise AMQP::Client::UnexpectedFrame.new(expected_frame_type, frame_type) unless frame_type == expected_frame_type - @replies.push [frame_type, *args] - end + args end end end