lib/amqp/client/channel.rb in amqp-client-0.1.0 vs lib/amqp/client/channel.rb in amqp-client-0.2.0

- old
+ new

@@ -4,98 +4,305 @@ module AMQP # AMQP Channel class Channel def initialize(connection, id) - @rpc = Queue.new + @replies = ::Queue.new @connection = connection @id = id - @closed = false + @consumers = {} + @confirm = nil + @last_confirmed = 0 + @closed = nil + @on_return = nil + @open = false end - attr_reader :id + attr_reader :id, :consumers def open + return self if @open + write_bytes FrameBytes.channel_open(@id) expect(:channel_open_ok) + @open = true self end def close(reason = "", code = 200) return if @closed write_bytes FrameBytes.channel_close(@id, reason, code) expect :channel_close_ok - @closed = true + @closed = [code, reason] end - def queue_declare(name = "", passive: false, durable: true, exclusive: false, auto_delete: false, **args) + # Called when closed by server + def closed!(code, reason, classid, methodid) + write_bytes FrameBytes.channel_close_ok(@id) + @closed = [code, reason, classid, methodid] + @replies.close + @consumers.each { |_, q| q.close } + @consumers.clear + end + + def exchange_declare(name, type, passive: false, durable: true, auto_delete: false, internal: false, **args) + write_bytes FrameBytes.exchange_declare(@id, name, type, passive, durable, auto_delete, internal, args) + expect :exchange_declare_ok + end + + def exchange_delete(name, if_unused: false, no_wait: false) + write_bytes FrameBytes.exchange_delete(@id, name, if_unused, no_wait) + expect :exchange_delete_ok + end + + def exchange_bind(destination, source, binding_key, arguments = {}) + write_bytes FrameBytes.exchange_bind(@id, destination, source, binding_key, false, arguments) + expect :exchange_bind_ok + end + + def exchange_unbind(destination, source, binding_key, arguments = {}) + write_bytes FrameBytes.exchange_unbind(@id, destination, source, binding_key, false, arguments) + expect :exchange_unbind_ok + end + + def queue_declare(name = "", passive: false, durable: true, exclusive: false, auto_delete: false, arguments: {}) durable = false if name.empty? exclusive = true if name.empty? auto_delete = true if name.empty? - write_bytes FrameBytes.queue_declare(@id, name, passive, durable, exclusive, auto_delete) + write_bytes FrameBytes.queue_declare(@id, name, passive, durable, exclusive, auto_delete, arguments) name, message_count, consumer_count = expect(:queue_declare_ok) { queue_name: name, message_count: message_count, consumer_count: consumer_count } end - def basic_get(queue_name, no_ack: true) - return if @closed + def queue_delete(name, if_unused: false, if_empty: false, no_wait: false) + write_bytes FrameBytes.queue_delete(@id, name, if_unused, if_empty, no_wait) + message_count, = expect :queue_delete + message_count + end + def queue_bind(name, exchange, binding_key, arguments = {}) + write_bytes FrameBytes.queue_bind(@id, name, exchange, binding_key, false, arguments) + expect :queue_bind_ok + end + + def queue_purge(name, no_wait: false) + write_bytes FrameBytes.queue_purge(@id, name, no_wait) + expect :queue_purge_ok unless no_wait + end + + def queue_unbind(name, exchange, binding_key, arguments = {}) + write_bytes FrameBytes.queue_unbind(@id, name, exchange, binding_key, arguments) + expect :queue_unbind_ok + end + + def basic_get(queue_name, no_ack: true) write_bytes FrameBytes.basic_get(@id, queue_name, no_ack) - resp = @rpc.shift - frame, = resp + frame, *rest = @replies.shift case frame when :basic_get_ok - _, exchange_name, routing_key, redelivered = resp + delivery_tag, exchange_name, routing_key, _message_count, redelivered = rest body_size, properties = expect(:header) pos = 0 - body = "" + body = String.new("", capacity: body_size) while pos < body_size - body_part = expect(:body) + body_part, = expect(:body) body += body_part pos += body_part.bytesize end - Message.new(exchange_name, routing_key, properties, body, redelivered) - when :basic_get_empty - nil - else raise AMQP::Client::UnexpectedFrame, %i[basic_get_ok basic_get_empty], frame + Message.new(self, delivery_tag, exchange_name, routing_key, properties, body, redelivered) + when :basic_get_empty then nil + when nil then raise AMQP::Client::ChannelClosedError.new(@id, *@closed) + else raise AMQP::Client::UnexpectedFrame.new(%i[basic_get_ok basic_get_empty], frame) end end - def basic_publish(exchange, routing_key, body, properties = {}) - raise AMQP::Client::ChannelClosedError, @id if @closed + def basic_publish(body, exchange, routing_key, **properties) + frame_max = @connection.frame_max - 8 + id = @id - write_bytes FrameBytes.basic_publish(@id, exchange, routing_key) - write_bytes FrameBytes.header(@id, body.bytesize, properties) + if 0 < body.bytesize && body.bytesize <= frame_max + write_bytes FrameBytes.basic_publish(id, exchange, routing_key, properties.delete(:mandatory) || false), + FrameBytes.header(id, body.bytesize, properties), + FrameBytes.body(id, body) + return @confirm ? @confirm += 1 : nil + end - # body frames, splitted on frame size + write_bytes FrameBytes.basic_publish(id, exchange, routing_key, properties.delete(:mandatory) || false), + FrameBytes.header(id, body.bytesize, properties) pos = 0 - while pos < body.bytesize - len = [4096, body.bytesize - pos].min + while pos < body.bytesize # split body into multiple frame_max frames + len = [frame_max, body.bytesize - pos].min body_part = body.byteslice(pos, len) - write_bytes FrameBytes.body(@id, body_part) + write_bytes FrameBytes.body(id, body_part) pos += len end + @confirm += 1 if @confirm end - def push(*args) - @rpc.push(*args) + def basic_publish_confirm(body, exchange, routing_key, **properties) + confirm_select(no_wait: true) + id = basic_publish(body, exchange, routing_key, **properties) + wait_for_confirm(id) end + # Consume from a queue + # worker_threads: 0 => blocking, messages are executed in the thread calling this method + def basic_consume(queue, tag: "", no_ack: true, exclusive: false, arguments: {}, + worker_threads: 1) + write_bytes FrameBytes.basic_consume(@id, queue, tag, no_ack, exclusive, arguments) + tag, = expect(:basic_consume_ok) + q = @consumers[tag] = ::Queue.new + msgs = ::Queue.new + Thread.new { recv_deliveries(tag, q, msgs) } + if worker_threads.zero? + while (msg = msgs.shift) + yield msg + end + else + threads = Array.new(worker_threads) do + Thread.new do + while (msg = msgs.shift) + yield(msg) + end + end + end + [tag, threads] + end + end + + def basic_cancel(consumer_tag, no_wait: false) + consumer = @consumers.fetch(consumer_tag) + return if consumer.closed? + + write_bytes FrameBytes.basic_cancel(@id, consumer_tag) + expect(:basic_cancel_ok) unless no_wait + consumer.close + end + + def basic_qos(prefetch_count, prefetch_size: 0, global: false) + write_bytes FrameBytes.basic_qos(@id, prefetch_size, prefetch_count, global) + expect :basic_qos_ok + end + + def basic_ack(delivery_tag, multiple: false) + write_bytes FrameBytes.basic_ack(@id, delivery_tag, multiple) + end + + def basic_nack(delivery_tag, multiple: false, requeue: false) + write_bytes FrameBytes.basic_nack(@id, delivery_tag, multiple, requeue) + end + + def basic_reject(delivery_tag, requeue: false) + write_bytes FrameBytes.basic_reject(@id, delivery_tag, requeue) + end + + def basic_recover(requeue: false) + write_bytes FrameBytes.basic_recover(@id, requeue: requeue) + expect :basic_recover_ok + end + + def confirm_select(no_wait: false) + return if @confirm + + write_bytes FrameBytes.confirm_select(@id, no_wait) + expect :confirm_select_ok unless no_wait + @confirms = ::Queue.new + @confirm = 0 + end + + def wait_for_confirm(id) + raise ArgumentError, "Confirm id has to a positive number" unless id&.positive? + return true if @last_confirmed >= id + + loop do + ack, delivery_tag, multiple = @confirms.shift || break + @last_confirmed = delivery_tag + return ack if delivery_tag == id || (delivery_tag > id && multiple) + end + false + end + + def tx_select + write_bytes FrameBytes.tx_select(@id) + expect :tx_select_ok + end + + def tx_commit + write_bytes FrameBytes.tx_commit(@id) + expect :tx_commit_ok + end + + def tx_rollback + write_bytes FrameBytes.tx_rollback(@id) + expect :tx_rollback_ok + end + + def reply(args) + @replies.push(args) + end + + def confirm(args) + @confirms.push(args) + end + + def message_returned(reply_code, reply_text, exchange, routing_key) + Thread.new do + body_size, properties = expect(:header) + body = String.new("", capacity: body_size) + while body.bytesize < body_size + body_part, = expect(:body) + body += body_part + end + msg = ReturnMessage.new(reply_code, reply_text, exchange, routing_key, properties, body) + + if @on_return + @on_return.call(msg) + else + puts "[WARN] Message returned: #{msg.inspect}" + end + end + end + + def on_return(&block) + @on_return = block + end + private - def write_bytes(bytes) - @connection.write_bytes bytes + def recv_deliveries(consumer_tag, deliver_queue, msgs) + loop do + _, delivery_tag, redelivered, exchange, routing_key = deliver_queue.shift || raise(ClosedQueueError) + body_size, properties = expect(:header) + body = String.new("", capacity: body_size) + while body.bytesize < body_size + body_part, = expect(:body) + body += body_part + end + msgs.push Message.new(self, delivery_tag, exchange, routing_key, properties, body, redelivered, consumer_tag) + end + ensure + msgs.close end + def write_bytes(*bytes) + raise AMQP::Client::ChannelClosedError.new(@id, *@closed) if @closed + + @connection.write_bytes(*bytes) + end + def expect(expected_frame_type) - frame_type, args = @rpc.shift - frame_type == expected_frame_type || raise(UnexpectedFrame, expected_frame_type, frame_type) - args + loop do + frame_type, *args = @replies.shift + raise AMQP::Client::ChannelClosedError.new(@id, *@closed) if frame_type.nil? + return args if frame_type == expected_frame_type + + @replies.push [frame_type, *args] + end end end end