lib/amqp/client/channel.rb in amqp-client-1.0.0 vs lib/amqp/client/channel.rb in amqp-client-1.0.1
- old
+ new
@@ -1,340 +1,545 @@
# frozen_string_literal: true
require_relative "./message"
module AMQP
- # AMQP Channel
- class Channel
- def initialize(connection, id)
- @connection = connection
- @id = id
- @replies = ::Queue.new
- @consumers = {}
- @closed = nil
- @open = false
- @on_return = nil
- @confirm = nil
- @unconfirmed = ::Queue.new
- @unconfirmed_empty = ::Queue.new
- @basic_gets = ::Queue.new
- end
+ class Client
+ class Connection
+ # AMQP Channel
+ class Channel
+ # Should only be called from Connection
+ # @param connection [Connection] The connection this channel belongs to
+ # @param id [Integer] ID of the channel
+ # @see Connection#channel
+ # @api private
+ def initialize(connection, id)
+ @connection = connection
+ @id = id
+ @replies = ::Queue.new
+ @consumers = {}
+ @closed = nil
+ @open = false
+ @on_return = nil
+ @confirm = nil
+ @unconfirmed = ::Queue.new
+ @unconfirmed_empty = ::Queue.new
+ @basic_gets = ::Queue.new
+ end
- def inspect
- "#<#{self.class} @id=#{@id} @open=#{@open} @closed=#{@closed} confirm_selected=#{!@confirm.nil?}"\
- " consumer_count=#{@consumers.size} replies_count=#{@replies.size} unconfirmed_count=#{@unconfirmed.size}>"
- end
+ # Override #inspect
+ # @api private
+ def inspect
+ "#<#{self.class} @id=#{@id} @open=#{@open} @closed=#{@closed} confirm_selected=#{!@confirm.nil?}"\
+ " consumer_count=#{@consumers.size} replies_count=#{@replies.size} unconfirmed_count=#{@unconfirmed.size}>"
+ end
- attr_reader :id
+ # Channel ID
+ # @return [Integer]
+ attr_reader :id
- def open
- return self if @open
+ # Open the channel (called from Connection)
+ # @return [Channel] self
+ # @api private
+ def open
+ return self if @open
- @open = true
- write_bytes FrameBytes.channel_open(@id)
- expect(:channel_open_ok)
- self
- end
+ @open = true
+ write_bytes FrameBytes.channel_open(@id)
+ expect(:channel_open_ok)
+ self
+ end
- def close(reason = "", code = 200)
- return if @closed
+ # Gracefully close a connection
+ # @return [nil]
+ def close(reason: "", code: 200)
+ return if @closed
- write_bytes FrameBytes.channel_close(@id, reason, code)
- @closed = [code, reason]
- expect :channel_close_ok
- @replies.close
- @basic_gets.close
- @unconfirmed_empty.close
- @consumers.each_value(&:close)
- end
+ write_bytes FrameBytes.channel_close(@id, reason, code)
+ @closed = [code, reason]
+ expect :channel_close_ok
+ @replies.close
+ @basic_gets.close
+ @unconfirmed_empty.close
+ @consumers.each_value(&:close)
+ nil
+ end
- # Called when channel is closed by server
- def closed!(code, reason, classid, methodid)
- @closed = [code, reason, classid, methodid]
- @replies.close
- @basic_gets.close
- @unconfirmed_empty.close
- @consumers.each_value(&:close)
- end
+ # Called when channel is closed by server
+ # @return [nil]
+ # @api private
+ def closed!(code, reason, classid, methodid)
+ @closed = [code, reason, classid, methodid]
+ @replies.close
+ @basic_gets.close
+ @unconfirmed_empty.close
+ @consumers.each_value(&:close)
+ nil
+ end
- def exchange_declare(name, type, passive: false, durable: true, auto_delete: false, internal: false, arguments: {})
- write_bytes FrameBytes.exchange_declare(@id, name, type, passive, durable, auto_delete, internal, arguments)
- expect :exchange_declare_ok
- end
+ # Handle returned messages in this block. If not set the message will just be logged to STDERR
+ # @yield [ReturnMessage] Messages returned by the server when a publish has failed
+ # @return nil
+ def on_return(&block)
+ @on_return = block
+ nil
+ end
- def exchange_delete(name, if_unused: false, no_wait: false)
- write_bytes FrameBytes.exchange_delete(@id, name, if_unused, no_wait)
- expect :exchange_delete_ok
- end
+ # @!group Exchange
- def exchange_bind(destination, source, binding_key, arguments: {})
- write_bytes FrameBytes.exchange_bind(@id, destination, source, binding_key, false, arguments)
- expect :exchange_bind_ok
- end
+ # Declare an exchange
+ # @param name [String] Name of the exchange
+ # @param type [String] Type of exchange (amq.direct, amq.fanout, amq.topic, amq.headers, etc.)
+ # @param passive [Boolean] If true raise an exception if the exchange doesn't already exists
+ # @param durable [Boolean] If true the exchange will persist between broker restarts,
+ # also a requirement for persistent messages
+ # @param auto_delete [Boolean] If true the exchange will be deleted when the last queue/exchange is unbound
+ # @param internal [Boolean] If true the exchange can't be published to directly
+ # @param arguments [Hash] Custom arguments
+ # @return [nil]
+ def exchange_declare(name, type, passive: false, durable: true, auto_delete: false, internal: false, arguments: {})
+ write_bytes FrameBytes.exchange_declare(@id, name, type, passive, durable, auto_delete, internal, arguments)
+ expect :exchange_declare_ok
+ nil
+ end
- def exchange_unbind(destination, source, binding_key, arguments: {})
- write_bytes FrameBytes.exchange_unbind(@id, destination, source, binding_key, false, arguments)
- expect :exchange_unbind_ok
- end
+ # Delete an exchange
+ # @param name [String] Name of the exchange
+ # @param if_unused [Boolean] If true raise an exception if queues/exchanges is bound to this exchange
+ # @param no_wait [Boolean] If true don't wait for a server confirmation
+ # @return [nil]
+ def exchange_delete(name, if_unused: false, no_wait: false)
+ write_bytes FrameBytes.exchange_delete(@id, name, if_unused, no_wait)
+ expect :exchange_delete_ok unless no_wait
+ nil
+ end
- QueueOk = Struct.new(:queue_name, :message_count, :consumer_count)
+ # Bind an exchange to another exchange
+ # @param destination [String] Name of the exchange to bind
+ # @param source [String] Name of the exchange to bind to
+ # @param binding_key [String] Binding key on which messages that match might be routed (depending on exchange type)
+ # @param arguments [Hash] Message headers to match on, but only when bound to header exchanges
+ # @return [nil]
+ def exchange_bind(destination, source, binding_key, arguments: {})
+ write_bytes FrameBytes.exchange_bind(@id, destination, source, binding_key, false, arguments)
+ expect :exchange_bind_ok
+ nil
+ end
- def queue_declare(name = "", passive: false, durable: true, exclusive: false, auto_delete: false, arguments: {})
- durable = false if name.empty?
- exclusive = true if name.empty?
- auto_delete = true if name.empty?
+ # Unbind an exchange from another exchange
+ # @param destination [String] Name of the exchange to unbind
+ # @param source [String] Name of the exchange to unbind from
+ # @param binding_key [String] Binding key which the queue is bound to the exchange with
+ # @param arguments [Hash] Arguments matching the binding that's being removed
+ # @return [nil]
+ def exchange_unbind(destination, source, binding_key, arguments: {})
+ write_bytes FrameBytes.exchange_unbind(@id, destination, source, binding_key, false, arguments)
+ expect :exchange_unbind_ok
+ nil
+ end
- write_bytes FrameBytes.queue_declare(@id, name, passive, durable, exclusive, auto_delete, arguments)
- name, message_count, consumer_count = expect(:queue_declare_ok)
+ # @!endgroup
+ # @!group Queue
- QueueOk.new(name, message_count, consumer_count)
- end
+ # Response when declaring a Queue
+ # @!attribute queue_name
+ # @return [String] The name of the queue
+ # @!attribute message_count
+ # @return [Integer] Number of messages in the queue at the time of declaration
+ # @!attribute consumer_count
+ # @return [Integer] Number of consumers subscribed to the queue at the time of declaration
+ QueueOk = Struct.new(:queue_name, :message_count, :consumer_count)
- def queue_delete(name, if_unused: false, if_empty: false, no_wait: false)
- write_bytes FrameBytes.queue_delete(@id, name, if_unused, if_empty, no_wait)
- message_count, = expect :queue_delete
- message_count
- end
+ # Create a queue (operation is idempotent)
+ # @param name [String] Name of the queue, can be empty, but will then be generated by the broker
+ # @param passive [Boolean] If true an exception will be raised if the queue doesn't already exists
+ # @param durable [Boolean] If true the queue will survive broker restarts,
+ # messages in the queue will only survive if they are published as persistent
+ # @param exclusive [Boolean] If true the queue will be deleted when the channel is closed
+ # @param auto_delete [Boolean] If true the queue will be deleted when the last consumer stops consuming
+ # (it won't be deleted until at least one consumer has consumed from it)
+ # @param arguments [Hash] Custom arguments, such as queue-ttl etc.
+ # @return [QueueOk] The QueueOk struct got `queue_name`, `message_count` and `consumer_count` properties
+ def queue_declare(name = "", passive: false, durable: true, exclusive: false, auto_delete: false, arguments: {})
+ durable = false if name.empty?
+ exclusive = true if name.empty?
+ auto_delete = true if name.empty?
- def queue_bind(name, exchange, binding_key, arguments: {})
- write_bytes FrameBytes.queue_bind(@id, name, exchange, binding_key, false, arguments)
- expect :queue_bind_ok
- end
+ write_bytes FrameBytes.queue_declare(@id, name, passive, durable, exclusive, auto_delete, arguments)
+ name, message_count, consumer_count = expect(:queue_declare_ok)
- def queue_purge(name, no_wait: false)
- write_bytes FrameBytes.queue_purge(@id, name, no_wait)
- expect :queue_purge_ok unless no_wait
- end
+ QueueOk.new(name, message_count, consumer_count)
+ end
- def queue_unbind(name, exchange, binding_key, arguments: {})
- write_bytes FrameBytes.queue_unbind(@id, name, exchange, binding_key, arguments)
- expect :queue_unbind_ok
- end
+ # Delete a queue
+ # @param name [String] Name of the queue
+ # @param if_unused [Boolean] Only delete if the queue doesn't have consumers, raises a ChannelClosed error otherwise
+ # @param if_empty [Boolean] Only delete if the queue is empty, raises a ChannelClosed error otherwise
+ # @param no_wait [Boolean] Don't wait for a server confirmation if true
+ # @return [Integer] Number of messages in queue when deleted
+ # @return [nil] If no_wait was set true
+ def queue_delete(name, if_unused: false, if_empty: false, no_wait: false)
+ write_bytes FrameBytes.queue_delete(@id, name, if_unused, if_empty, no_wait)
+ message_count, = expect :queue_delete unless no_wait
+ message_count
+ end
- def basic_get(queue_name, no_ack: true)
- write_bytes FrameBytes.basic_get(@id, queue_name, no_ack)
- case (msg = @basic_gets.pop)
- when Message then msg
- when :basic_get_empty then nil
- when nil then raise AMQP::Client::ChannelClosedError.new(@id, *@closed)
- end
- end
+ # Bind a queue to an exchange
+ # @param name [String] Name of the queue to bind
+ # @param exchange [String] Name of the exchange to bind to
+ # @param binding_key [String] Binding key on which messages that match might be routed (depending on exchange type)
+ # @param arguments [Hash] Message headers to match on, but only when bound to header exchanges
+ # @return [nil]
+ def queue_bind(name, exchange, binding_key, arguments: {})
+ write_bytes FrameBytes.queue_bind(@id, name, exchange, binding_key, false, arguments)
+ expect :queue_bind_ok
+ nil
+ end
- def basic_publish(body, exchange, routing_key, **properties)
- frame_max = @connection.frame_max - 8
- id = @id
- mandatory = properties.delete(:mandatory) || false
- case properties.delete(:persistent)
- when true then properties[:delivery_mode] = 2
- when false then properties[:delivery_mode] = 1
- end
+ # Purge a queue
+ # @param name [String] Name of the queue
+ # @param no_wait [Boolean] Don't wait for a server confirmation if true
+ # @return [nil]
+ def queue_purge(name, no_wait: false)
+ write_bytes FrameBytes.queue_purge(@id, name, no_wait)
+ expect :queue_purge_ok unless no_wait
+ nil
+ end
- if body.bytesize.between?(1, frame_max)
- write_bytes FrameBytes.basic_publish(id, exchange, routing_key, mandatory),
- FrameBytes.header(id, body.bytesize, properties),
- FrameBytes.body(id, body)
- @unconfirmed.push @confirm += 1 if @confirm
- return
- end
+ # Unbind a queue from an exchange
+ # @param name [String] Name of the queue to unbind
+ # @param exchange [String] Name of the exchange to unbind from
+ # @param binding_key [String] Binding key which the queue is bound to the exchange with
+ # @param arguments [Hash] Arguments matching the binding that's being removed
+ # @return [nil]
+ def queue_unbind(name, exchange, binding_key, arguments: {})
+ write_bytes FrameBytes.queue_unbind(@id, name, exchange, binding_key, arguments)
+ expect :queue_unbind_ok
+ end
- write_bytes FrameBytes.basic_publish(id, exchange, routing_key, mandatory),
- FrameBytes.header(id, body.bytesize, properties)
- pos = 0
- while pos < body.bytesize # split body into multiple frame_max frames
- len = [frame_max, body.bytesize - pos].min
- body_part = body.byteslice(pos, len)
- write_bytes FrameBytes.body(id, body_part)
- pos += len
- end
- @unconfirmed.push @confirm += 1 if @confirm
- nil
- end
+ # @!endgroup
+ # @!group Basic
- def basic_publish_confirm(body, exchange, routing_key, **properties)
- confirm_select(no_wait: true)
- basic_publish(body, exchange, routing_key, **properties)
- wait_for_confirms
- end
+ # Get a message from a queue (by polling)
+ # @param queue_name [String]
+ # @param no_ack [Boolean] When false the message have to be manually acknowledged
+ # @return [Message] If the queue had a message
+ # @return [nil] If the queue doesn't have any messages
+ def basic_get(queue_name, no_ack: true)
+ write_bytes FrameBytes.basic_get(@id, queue_name, no_ack)
+ case (msg = @basic_gets.pop)
+ when Message then msg
+ when :basic_get_empty then nil
+ when nil then raise Error::ChannelClosed.new(@id, *@closed)
+ end
+ end
- # Consume from a queue
- # worker_threads: 0 => blocking, messages are executed in the thread calling this method
- def basic_consume(queue, tag: "", no_ack: true, exclusive: false, arguments: {}, worker_threads: 1)
- write_bytes FrameBytes.basic_consume(@id, queue, tag, no_ack, exclusive, arguments)
- tag, = expect(:basic_consume_ok)
- q = @consumers[tag] = ::Queue.new
- if worker_threads.zero?
- loop do
- yield (q.pop || break)
+ # Publishes a message to an exchange
+ # @param body [String] The body, can be a string or a byte array
+ # @param exchange [String] Name of the exchange to publish to
+ # @param routing_key [String] The routing key that the exchange might use to route the message to a queue
+ # @param properties [Properties]
+ # @option properties [String] content_type Content type of the message body
+ # @option properties [String] content_encoding Content encoding of the body
+ # @option properties [Hash<String, Object>] headers Custom headers
+ # @option properties [Integer] delivery_mode 2 for persisted message, transient messages for all other values
+ # @option properties [Integer] priority A priority of the message (between 0 and 255)
+ # @option properties [Integer] correlation_id A correlation id, most often used used for RPC communication
+ # @option properties [String] reply_to Queue to reply RPC responses to
+ # @option properties [Integer, String] expiration Number of seconds the message will stay in the queue
+ # @option properties [String] message_id Can be used to uniquely identify the message, e.g. for deduplication
+ # @option properties [Date] timestamp Often used for the time the message was originally generated
+ # @option properties [String] type Can indicate what kind of message this is
+ # @option properties [String] user_id Can be used to verify that this is the user that published the message
+ # @option properties [String] app_id Can be used to indicates which app that generated the message
+ # @return [nil]
+ def basic_publish(body, exchange, routing_key, **properties)
+ frame_max = @connection.frame_max - 8
+ id = @id
+ mandatory = properties.delete(:mandatory) || false
+ case properties.delete(:persistent)
+ when true then properties[:delivery_mode] = 2
+ when false then properties[:delivery_mode] = 1
+ end
+
+ if body.bytesize.between?(1, frame_max)
+ write_bytes FrameBytes.basic_publish(id, exchange, routing_key, mandatory),
+ FrameBytes.header(id, body.bytesize, properties),
+ FrameBytes.body(id, body)
+ @unconfirmed.push @confirm += 1 if @confirm
+ return
+ end
+
+ write_bytes FrameBytes.basic_publish(id, exchange, routing_key, mandatory),
+ FrameBytes.header(id, body.bytesize, properties)
+ pos = 0
+ while pos < body.bytesize # split body into multiple frame_max frames
+ len = [frame_max, body.bytesize - pos].min
+ body_part = body.byteslice(pos, len)
+ write_bytes FrameBytes.body(id, body_part)
+ pos += len
+ end
+ @unconfirmed.push @confirm += 1 if @confirm
+ nil
end
- else
- threads = Array.new(worker_threads) do
- Thread.new do
+
+ # Publish a message and block until the message has confirmed it has received it
+ # @param (see #basic_publish)
+ # @return [Boolean] True if the message was successfully published
+ def basic_publish_confirm(body, exchange, routing_key, **properties)
+ confirm_select(no_wait: true)
+ basic_publish(body, exchange, routing_key, **properties)
+ wait_for_confirms
+ end
+
+ # Consume messages from a queue
+ # @param queue [String] Name of the queue to subscribe to
+ # @param tag [String] Custom consumer tag, will be auto assigned by the server if empty
+ # @param no_ack [Boolean] When false messages have to be manually acknowledged (or rejected)
+ # @param exclusive [Boolean] When true only a single consumer can consume from the queue at a time
+ # @param arguments [Hash] Custom arguments to the consumer
+ # @param worker_threads [Integer] Number of threads processing messages,
+ # 0 means that the thread calling this method will be blocked
+ # @yield [Message] Delivered message from the queue
+ # @return [Array<(String, Array<Thread>)>] Returns consumer_tag and an array of worker threads
+ # @return [nil] When `worker_threads` is 0 the method will return when the consumer is cancelled
+ def basic_consume(queue, tag: "", no_ack: true, exclusive: false, arguments: {}, worker_threads: 1)
+ write_bytes FrameBytes.basic_consume(@id, queue, tag, no_ack, exclusive, arguments)
+ tag, = expect(:basic_consume_ok)
+ q = @consumers[tag] = ::Queue.new
+ if worker_threads.zero?
loop do
yield (q.pop || break)
end
+ nil
+ else
+ threads = Array.new(worker_threads) do
+ Thread.new do
+ loop do
+ yield (q.pop || break)
+ end
+ end
+ end
+ [tag, threads]
end
end
- [tag, threads]
- end
- end
- def basic_cancel(consumer_tag, no_wait: false)
- consumer = @consumers.fetch(consumer_tag)
- return if consumer.closed?
+ # Cancel/abort/stop a consumer
+ # @param consumer_tag [String] Tag of the consumer to cancel
+ # @param no_wait [Boolean] Will wait for a confirmation from the server that the consumer is cancelled
+ # @return [nil]
+ def basic_cancel(consumer_tag, no_wait: false)
+ consumer = @consumers.fetch(consumer_tag)
+ return if consumer.closed?
- write_bytes FrameBytes.basic_cancel(@id, consumer_tag)
- expect(:basic_cancel_ok) unless no_wait
- consumer.close
- end
+ write_bytes FrameBytes.basic_cancel(@id, consumer_tag)
+ expect(:basic_cancel_ok) unless no_wait
+ consumer.close
+ nil
+ end
- def basic_qos(prefetch_count, prefetch_size: 0, global: false)
- write_bytes FrameBytes.basic_qos(@id, prefetch_size, prefetch_count, global)
- expect :basic_qos_ok
- end
+ # Specify how many messages to prefetch for consumers with `no_ack: false`
+ # @param prefetch_count [Integer] Number of messages to maxium keep in flight
+ # @param prefetch_size [Integer] Number of bytes to maxium keep in flight
+ # @param global [Boolean] If true the limit will apply to channel rather than the consumer
+ # @return [nil]
+ def basic_qos(prefetch_count, prefetch_size: 0, global: false)
+ write_bytes FrameBytes.basic_qos(@id, prefetch_size, prefetch_count, global)
+ expect :basic_qos_ok
+ nil
+ end
- def basic_ack(delivery_tag, multiple: false)
- write_bytes FrameBytes.basic_ack(@id, delivery_tag, multiple)
- end
+ # Acknowledge a message
+ # @param delivery_tag [Integer] The delivery tag of the message to acknowledge
+ # @return [nil]
+ def basic_ack(delivery_tag, multiple: false)
+ write_bytes FrameBytes.basic_ack(@id, delivery_tag, multiple)
+ nil
+ end
- def basic_nack(delivery_tag, multiple: false, requeue: false)
- write_bytes FrameBytes.basic_nack(@id, delivery_tag, multiple, requeue)
- end
+ # Negatively acknowledge a message
+ # @param delivery_tag [Integer] The delivery tag of the message to acknowledge
+ # @param multiple [Boolean] Nack all messages up to this message
+ # @param requeue [Boolean] Requeue the message
+ # @return [nil]
+ def basic_nack(delivery_tag, multiple: false, requeue: false)
+ write_bytes FrameBytes.basic_nack(@id, delivery_tag, multiple, requeue)
+ nil
+ end
- def basic_reject(delivery_tag, requeue: false)
- write_bytes FrameBytes.basic_reject(@id, delivery_tag, requeue)
- end
+ # Reject a message
+ # @param delivery_tag [Integer] The delivery tag of the message to acknowledge
+ # @param requeue [Boolean] Requeue the message into the queue again
+ # @return [nil]
+ def basic_reject(delivery_tag, requeue: false)
+ write_bytes FrameBytes.basic_reject(@id, delivery_tag, requeue)
+ nil
+ end
- def basic_recover(requeue: false)
- write_bytes FrameBytes.basic_recover(@id, requeue: requeue)
- expect :basic_recover_ok
- end
+ # Recover all the unacknowledge messages
+ # @param requeue [Boolean] If false the currently unack:ed messages will be deliviered to this consumer again,
+ # if false to any consumer
+ # @return [nil]
+ def basic_recover(requeue: false)
+ write_bytes FrameBytes.basic_recover(@id, requeue: requeue)
+ expect :basic_recover_ok
+ nil
+ end
- def confirm_select(no_wait: false)
- return if @confirm
+ # @!endgroup
+ # @!group Confirm
- write_bytes FrameBytes.confirm_select(@id, no_wait)
- expect :confirm_select_ok unless no_wait
- @confirm = 0
- end
+ # Put the channel in confirm mode, each published message will then be confirmed by the server
+ # @param no_wait [Boolean] If false the method will block until the server has confirmed the request
+ # @return [nil]
+ def confirm_select(no_wait: false)
+ return if @confirm
- # Block until all publishes messages are confirmed
- def wait_for_confirms
- return true if @unconfirmed.empty?
+ write_bytes FrameBytes.confirm_select(@id, no_wait)
+ expect :confirm_select_ok unless no_wait
+ @confirm = 0
+ nil
+ end
- case @unconfirmed_empty.pop
- when true then true
- when false then false
- else raise AMQP::Client::ChannelClosedError.new(@id, *@closed)
- end
- end
+ # Block until all publishes messages are confirmed
+ # @return [Boolean] True if all message where positivly acknowledged, false if not
+ def wait_for_confirms
+ return true if @unconfirmed.empty?
- # Called by Connection when received ack/nack from server
- def confirm(args)
- ack_or_nack, delivery_tag, multiple = *args
- loop do
- tag = @unconfirmed.pop(true)
- break if tag == delivery_tag
- next if multiple && tag < delivery_tag
+ case @unconfirmed_empty.pop
+ when true then true
+ when false then false
+ else raise Error::ChannelClosed.new(@id, *@closed)
+ end
+ end
- @unconfirmed << tag # requeue
- rescue ThreadError
- break
- end
- return unless @unconfirmed.empty?
+ # Called by Connection when received ack/nack from server
+ # @api private
+ def confirm(args)
+ ack_or_nack, delivery_tag, multiple = *args
+ loop do
+ tag = @unconfirmed.pop(true)
+ break if tag == delivery_tag
+ next if multiple && tag < delivery_tag
- @unconfirmed_empty.num_waiting.times do
- @unconfirmed_empty << (ack_or_nack == :ack)
- end
- end
+ @unconfirmed << tag # requeue
+ rescue ThreadError
+ break
+ end
+ return unless @unconfirmed.empty?
- def tx_select
- write_bytes FrameBytes.tx_select(@id)
- expect :tx_select_ok
- end
+ @unconfirmed_empty.num_waiting.times do
+ @unconfirmed_empty << (ack_or_nack == :ack)
+ end
+ end
- def tx_commit
- write_bytes FrameBytes.tx_commit(@id)
- expect :tx_commit_ok
- end
+ # @!endgroup
+ # @!group Transaction
- def tx_rollback
- write_bytes FrameBytes.tx_rollback(@id)
- expect :tx_rollback_ok
- end
+ # Put the channel in transaction mode, make sure that you #tx_commit or #tx_rollback after publish
+ # @return [nil]
+ def tx_select
+ write_bytes FrameBytes.tx_select(@id)
+ expect :tx_select_ok
+ nil
+ end
- def on_return(&block)
- @on_return = block
- end
+ # Commmit a transaction, requires that the channel is in transaction mode
+ # @return [nil]
+ def tx_commit
+ write_bytes FrameBytes.tx_commit(@id)
+ expect :tx_commit_ok
+ nil
+ end
- def reply(args)
- @replies.push(args)
- end
+ # Rollback a transaction, requires that the channel is in transaction mode
+ # @return [nil]
+ def tx_rollback
+ write_bytes FrameBytes.tx_rollback(@id)
+ expect :tx_rollback_ok
+ nil
+ end
- def message_returned(reply_code, reply_text, exchange, routing_key)
- @next_msg = ReturnMessage.new(reply_code, reply_text, exchange, routing_key, nil, "")
- end
+ # @!endgroup
- def message_delivered(consumer_tag, delivery_tag, redelivered, exchange, routing_key)
- @next_msg = Message.new(self, delivery_tag, exchange, routing_key, nil, "", redelivered, consumer_tag)
- end
+ # @api private
+ def reply(args)
+ @replies.push(args)
+ end
- def basic_get_empty
- @basic_gets.push :basic_get_empty
- end
+ # @api private
+ def message_returned(reply_code, reply_text, exchange, routing_key)
+ @next_msg = ReturnMessage.new(reply_code, reply_text, exchange, routing_key, nil, "")
+ end
- def header_delivered(body_size, properties)
- @next_msg.properties = properties
- if body_size.zero?
- next_message_finished!
- else
- @next_body = StringIO.new(String.new(capacity: body_size))
- @next_body_size = body_size
- end
- end
+ # @api private
+ def message_delivered(consumer_tag, delivery_tag, redelivered, exchange, routing_key)
+ @next_msg = Message.new(self, delivery_tag, exchange, routing_key, nil, "", redelivered, consumer_tag)
+ end
- def body_delivered(body_part)
- @next_body.write(body_part)
- return unless @next_body.pos == @next_body_size
+ # @api private
+ def basic_get_empty
+ @basic_gets.push :basic_get_empty
+ end
- @next_msg.body = @next_body.string
- next_message_finished!
- end
+ # @api private
+ def header_delivered(body_size, properties)
+ @next_msg.properties = properties
+ if body_size.zero?
+ next_message_finished!
+ else
+ @next_body = StringIO.new(String.new(capacity: body_size))
+ @next_body_size = body_size
+ end
+ end
- def close_consumer(tag)
- @consumers.fetch(tag).close
- end
+ # @api private
+ def body_delivered(body_part)
+ @next_body.write(body_part)
+ return unless @next_body.pos == @next_body_size
- private
+ @next_msg.body = @next_body.string
+ next_message_finished!
+ end
- def next_message_finished!
- next_msg = @next_msg
- if next_msg.is_a? ReturnMessage
- if @on_return
- Thread.new { @on_return.call(next_msg) }
- else
- warn "AMQP-Client message returned: #{msg.inspect}"
+ # @api private
+ def close_consumer(tag)
+ @consumers.fetch(tag).close
end
- elsif next_msg.consumer_tag.nil?
- @basic_gets.push next_msg
- else
- Thread.pass until (consumer = @consumers[next_msg.consumer_tag])
- consumer.push next_msg
- end
- ensure
- @next_msg = @next_body = @next_body_size = nil
- end
- def write_bytes(*bytes)
- raise AMQP::Client::ChannelClosedError.new(@id, *@closed) if @closed
+ private
- @connection.write_bytes(*bytes)
- end
+ def next_message_finished!
+ next_msg = @next_msg
+ if next_msg.is_a? ReturnMessage
+ if @on_return
+ Thread.new { @on_return.call(next_msg) }
+ else
+ warn "AMQP-Client message returned: #{msg.inspect}"
+ end
+ elsif next_msg.consumer_tag.nil?
+ @basic_gets.push next_msg
+ else
+ Thread.pass until (consumer = @consumers[next_msg.consumer_tag])
+ consumer.push next_msg
+ end
+ ensure
+ @next_msg = @next_body = @next_body_size = nil
+ end
- def expect(expected_frame_type)
- frame_type, *args = @replies.pop
- raise AMQP::Client::ChannelClosedError.new(@id, *@closed) if frame_type.nil?
- raise AMQP::Client::UnexpectedFrame.new(expected_frame_type, frame_type) unless frame_type == expected_frame_type
+ def write_bytes(*bytes)
+ raise Error::ChannelClosed.new(@id, *@closed) if @closed
- args
+ @connection.write_bytes(*bytes)
+ end
+
+ def expect(expected_frame_type)
+ frame_type, *args = @replies.pop
+ raise Error::ChannelClosed.new(@id, *@closed) if frame_type.nil?
+ raise Error::UnexpectedFrame.new(expected_frame_type, frame_type) unless frame_type == expected_frame_type
+
+ args
+ end
+ end
end
end
end