lib/amqp/client/channel.rb in amqp-client-0.1.0 vs lib/amqp/client/channel.rb in amqp-client-0.2.0
- old
+ new
@@ -4,98 +4,305 @@
module AMQP
# AMQP Channel
class Channel
def initialize(connection, id)
- @rpc = Queue.new
+ @replies = ::Queue.new
@connection = connection
@id = id
- @closed = false
+ @consumers = {}
+ @confirm = nil
+ @last_confirmed = 0
+ @closed = nil
+ @on_return = nil
+ @open = false
end
- attr_reader :id
+ attr_reader :id, :consumers
def open
+ return self if @open
+
write_bytes FrameBytes.channel_open(@id)
expect(:channel_open_ok)
+ @open = true
self
end
def close(reason = "", code = 200)
return if @closed
write_bytes FrameBytes.channel_close(@id, reason, code)
expect :channel_close_ok
- @closed = true
+ @closed = [code, reason]
end
- def queue_declare(name = "", passive: false, durable: true, exclusive: false, auto_delete: false, **args)
+ # Called when closed by server
+ def closed!(code, reason, classid, methodid)
+ write_bytes FrameBytes.channel_close_ok(@id)
+ @closed = [code, reason, classid, methodid]
+ @replies.close
+ @consumers.each { |_, q| q.close }
+ @consumers.clear
+ end
+
+ def exchange_declare(name, type, passive: false, durable: true, auto_delete: false, internal: false, **args)
+ write_bytes FrameBytes.exchange_declare(@id, name, type, passive, durable, auto_delete, internal, args)
+ expect :exchange_declare_ok
+ end
+
+ def exchange_delete(name, if_unused: false, no_wait: false)
+ write_bytes FrameBytes.exchange_delete(@id, name, if_unused, no_wait)
+ expect :exchange_delete_ok
+ end
+
+ def exchange_bind(destination, source, binding_key, arguments = {})
+ write_bytes FrameBytes.exchange_bind(@id, destination, source, binding_key, false, arguments)
+ expect :exchange_bind_ok
+ end
+
+ def exchange_unbind(destination, source, binding_key, arguments = {})
+ write_bytes FrameBytes.exchange_unbind(@id, destination, source, binding_key, false, arguments)
+ expect :exchange_unbind_ok
+ end
+
+ def queue_declare(name = "", passive: false, durable: true, exclusive: false, auto_delete: false, arguments: {})
durable = false if name.empty?
exclusive = true if name.empty?
auto_delete = true if name.empty?
- write_bytes FrameBytes.queue_declare(@id, name, passive, durable, exclusive, auto_delete)
+ write_bytes FrameBytes.queue_declare(@id, name, passive, durable, exclusive, auto_delete, arguments)
name, message_count, consumer_count = expect(:queue_declare_ok)
{
queue_name: name,
message_count: message_count,
consumer_count: consumer_count
}
end
- def basic_get(queue_name, no_ack: true)
- return if @closed
+ def queue_delete(name, if_unused: false, if_empty: false, no_wait: false)
+ write_bytes FrameBytes.queue_delete(@id, name, if_unused, if_empty, no_wait)
+ message_count, = expect :queue_delete
+ message_count
+ end
+ def queue_bind(name, exchange, binding_key, arguments = {})
+ write_bytes FrameBytes.queue_bind(@id, name, exchange, binding_key, false, arguments)
+ expect :queue_bind_ok
+ end
+
+ def queue_purge(name, no_wait: false)
+ write_bytes FrameBytes.queue_purge(@id, name, no_wait)
+ expect :queue_purge_ok unless no_wait
+ end
+
+ def queue_unbind(name, exchange, binding_key, arguments = {})
+ write_bytes FrameBytes.queue_unbind(@id, name, exchange, binding_key, arguments)
+ expect :queue_unbind_ok
+ end
+
+ def basic_get(queue_name, no_ack: true)
write_bytes FrameBytes.basic_get(@id, queue_name, no_ack)
- resp = @rpc.shift
- frame, = resp
+ frame, *rest = @replies.shift
case frame
when :basic_get_ok
- _, exchange_name, routing_key, redelivered = resp
+ delivery_tag, exchange_name, routing_key, _message_count, redelivered = rest
body_size, properties = expect(:header)
pos = 0
- body = ""
+ body = String.new("", capacity: body_size)
while pos < body_size
- body_part = expect(:body)
+ body_part, = expect(:body)
body += body_part
pos += body_part.bytesize
end
- Message.new(exchange_name, routing_key, properties, body, redelivered)
- when :basic_get_empty
- nil
- else raise AMQP::Client::UnexpectedFrame, %i[basic_get_ok basic_get_empty], frame
+ Message.new(self, delivery_tag, exchange_name, routing_key, properties, body, redelivered)
+ when :basic_get_empty then nil
+ when nil then raise AMQP::Client::ChannelClosedError.new(@id, *@closed)
+ else raise AMQP::Client::UnexpectedFrame.new(%i[basic_get_ok basic_get_empty], frame)
end
end
- def basic_publish(exchange, routing_key, body, properties = {})
- raise AMQP::Client::ChannelClosedError, @id if @closed
+ def basic_publish(body, exchange, routing_key, **properties)
+ frame_max = @connection.frame_max - 8
+ id = @id
- write_bytes FrameBytes.basic_publish(@id, exchange, routing_key)
- write_bytes FrameBytes.header(@id, body.bytesize, properties)
+ if 0 < body.bytesize && body.bytesize <= frame_max
+ write_bytes FrameBytes.basic_publish(id, exchange, routing_key, properties.delete(:mandatory) || false),
+ FrameBytes.header(id, body.bytesize, properties),
+ FrameBytes.body(id, body)
+ return @confirm ? @confirm += 1 : nil
+ end
- # body frames, splitted on frame size
+ write_bytes FrameBytes.basic_publish(id, exchange, routing_key, properties.delete(:mandatory) || false),
+ FrameBytes.header(id, body.bytesize, properties)
pos = 0
- while pos < body.bytesize
- len = [4096, body.bytesize - pos].min
+ while pos < body.bytesize # split body into multiple frame_max frames
+ len = [frame_max, body.bytesize - pos].min
body_part = body.byteslice(pos, len)
- write_bytes FrameBytes.body(@id, body_part)
+ write_bytes FrameBytes.body(id, body_part)
pos += len
end
+ @confirm += 1 if @confirm
end
- def push(*args)
- @rpc.push(*args)
+ def basic_publish_confirm(body, exchange, routing_key, **properties)
+ confirm_select(no_wait: true)
+ id = basic_publish(body, exchange, routing_key, **properties)
+ wait_for_confirm(id)
end
+ # Consume from a queue
+ # worker_threads: 0 => blocking, messages are executed in the thread calling this method
+ def basic_consume(queue, tag: "", no_ack: true, exclusive: false, arguments: {},
+ worker_threads: 1)
+ write_bytes FrameBytes.basic_consume(@id, queue, tag, no_ack, exclusive, arguments)
+ tag, = expect(:basic_consume_ok)
+ q = @consumers[tag] = ::Queue.new
+ msgs = ::Queue.new
+ Thread.new { recv_deliveries(tag, q, msgs) }
+ if worker_threads.zero?
+ while (msg = msgs.shift)
+ yield msg
+ end
+ else
+ threads = Array.new(worker_threads) do
+ Thread.new do
+ while (msg = msgs.shift)
+ yield(msg)
+ end
+ end
+ end
+ [tag, threads]
+ end
+ end
+
+ def basic_cancel(consumer_tag, no_wait: false)
+ consumer = @consumers.fetch(consumer_tag)
+ return if consumer.closed?
+
+ write_bytes FrameBytes.basic_cancel(@id, consumer_tag)
+ expect(:basic_cancel_ok) unless no_wait
+ consumer.close
+ end
+
+ def basic_qos(prefetch_count, prefetch_size: 0, global: false)
+ write_bytes FrameBytes.basic_qos(@id, prefetch_size, prefetch_count, global)
+ expect :basic_qos_ok
+ end
+
+ def basic_ack(delivery_tag, multiple: false)
+ write_bytes FrameBytes.basic_ack(@id, delivery_tag, multiple)
+ end
+
+ def basic_nack(delivery_tag, multiple: false, requeue: false)
+ write_bytes FrameBytes.basic_nack(@id, delivery_tag, multiple, requeue)
+ end
+
+ def basic_reject(delivery_tag, requeue: false)
+ write_bytes FrameBytes.basic_reject(@id, delivery_tag, requeue)
+ end
+
+ def basic_recover(requeue: false)
+ write_bytes FrameBytes.basic_recover(@id, requeue: requeue)
+ expect :basic_recover_ok
+ end
+
+ def confirm_select(no_wait: false)
+ return if @confirm
+
+ write_bytes FrameBytes.confirm_select(@id, no_wait)
+ expect :confirm_select_ok unless no_wait
+ @confirms = ::Queue.new
+ @confirm = 0
+ end
+
+ def wait_for_confirm(id)
+ raise ArgumentError, "Confirm id has to a positive number" unless id&.positive?
+ return true if @last_confirmed >= id
+
+ loop do
+ ack, delivery_tag, multiple = @confirms.shift || break
+ @last_confirmed = delivery_tag
+ return ack if delivery_tag == id || (delivery_tag > id && multiple)
+ end
+ false
+ end
+
+ def tx_select
+ write_bytes FrameBytes.tx_select(@id)
+ expect :tx_select_ok
+ end
+
+ def tx_commit
+ write_bytes FrameBytes.tx_commit(@id)
+ expect :tx_commit_ok
+ end
+
+ def tx_rollback
+ write_bytes FrameBytes.tx_rollback(@id)
+ expect :tx_rollback_ok
+ end
+
+ def reply(args)
+ @replies.push(args)
+ end
+
+ def confirm(args)
+ @confirms.push(args)
+ end
+
+ def message_returned(reply_code, reply_text, exchange, routing_key)
+ Thread.new do
+ body_size, properties = expect(:header)
+ body = String.new("", capacity: body_size)
+ while body.bytesize < body_size
+ body_part, = expect(:body)
+ body += body_part
+ end
+ msg = ReturnMessage.new(reply_code, reply_text, exchange, routing_key, properties, body)
+
+ if @on_return
+ @on_return.call(msg)
+ else
+ puts "[WARN] Message returned: #{msg.inspect}"
+ end
+ end
+ end
+
+ def on_return(&block)
+ @on_return = block
+ end
+
private
- def write_bytes(bytes)
- @connection.write_bytes bytes
+ def recv_deliveries(consumer_tag, deliver_queue, msgs)
+ loop do
+ _, delivery_tag, redelivered, exchange, routing_key = deliver_queue.shift || raise(ClosedQueueError)
+ body_size, properties = expect(:header)
+ body = String.new("", capacity: body_size)
+ while body.bytesize < body_size
+ body_part, = expect(:body)
+ body += body_part
+ end
+ msgs.push Message.new(self, delivery_tag, exchange, routing_key, properties, body, redelivered, consumer_tag)
+ end
+ ensure
+ msgs.close
end
+ def write_bytes(*bytes)
+ raise AMQP::Client::ChannelClosedError.new(@id, *@closed) if @closed
+
+ @connection.write_bytes(*bytes)
+ end
+
def expect(expected_frame_type)
- frame_type, args = @rpc.shift
- frame_type == expected_frame_type || raise(UnexpectedFrame, expected_frame_type, frame_type)
- args
+ loop do
+ frame_type, *args = @replies.shift
+ raise AMQP::Client::ChannelClosedError.new(@id, *@closed) if frame_type.nil?
+ return args if frame_type == expected_frame_type
+
+ @replies.push [frame_type, *args]
+ end
end
end
end