lib/amqp/client/channel.rb in amqp-client-1.0.1 vs lib/amqp/client/channel.rb in amqp-client-1.0.2
- old
+ new
@@ -53,33 +53,34 @@
# @return [nil]
def close(reason: "", code: 200)
return if @closed
write_bytes FrameBytes.channel_close(@id, reason, code)
- @closed = [code, reason]
+ @closed = [:channel, code, reason]
expect :channel_close_ok
@replies.close
@basic_gets.close
@unconfirmed_empty.close
@consumers.each_value(&:close)
nil
end
- # Called when channel is closed by server
+ # Called when channel is closed by broker
+ # @param level [Symbol] :connection or :channel
# @return [nil]
# @api private
- def closed!(code, reason, classid, methodid)
- @closed = [code, reason, classid, methodid]
+ def closed!(level, code, reason, classid, methodid)
+ @closed = [level, code, reason, classid, methodid]
@replies.close
@basic_gets.close
@unconfirmed_empty.close
@consumers.each_value(&:close)
nil
end
# Handle returned messages in this block. If not set the message will just be logged to STDERR
- # @yield [ReturnMessage] Messages returned by the server when a publish has failed
+ # @yield [ReturnMessage] Messages returned by the broker when a publish has failed
# @return nil
def on_return(&block)
@on_return = block
nil
end
@@ -103,11 +104,11 @@
end
# Delete an exchange
# @param name [String] Name of the exchange
# @param if_unused [Boolean] If true raise an exception if queues/exchanges is bound to this exchange
- # @param no_wait [Boolean] If true don't wait for a server confirmation
+ # @param no_wait [Boolean] If true don't wait for a broker confirmation
# @return [nil]
def exchange_delete(name, if_unused: false, no_wait: false)
write_bytes FrameBytes.exchange_delete(@id, name, if_unused, no_wait)
expect :exchange_delete_ok unless no_wait
nil
@@ -172,11 +173,11 @@
# Delete a queue
# @param name [String] Name of the queue
# @param if_unused [Boolean] Only delete if the queue doesn't have consumers, raises a ChannelClosed error otherwise
# @param if_empty [Boolean] Only delete if the queue is empty, raises a ChannelClosed error otherwise
- # @param no_wait [Boolean] Don't wait for a server confirmation if true
+ # @param no_wait [Boolean] Don't wait for a broker confirmation if true
# @return [Integer] Number of messages in queue when deleted
# @return [nil] If no_wait was set true
def queue_delete(name, if_unused: false, if_empty: false, no_wait: false)
write_bytes FrameBytes.queue_delete(@id, name, if_unused, if_empty, no_wait)
message_count, = expect :queue_delete unless no_wait
@@ -195,11 +196,11 @@
nil
end
# Purge a queue
# @param name [String] Name of the queue
- # @param no_wait [Boolean] Don't wait for a server confirmation if true
+ # @param no_wait [Boolean] Don't wait for a broker confirmation if true
# @return [nil]
def queue_purge(name, no_wait: false)
write_bytes FrameBytes.queue_purge(@id, name, no_wait)
expect :queue_purge_ok unless no_wait
nil
@@ -227,19 +228,21 @@
def basic_get(queue_name, no_ack: true)
write_bytes FrameBytes.basic_get(@id, queue_name, no_ack)
case (msg = @basic_gets.pop)
when Message then msg
when :basic_get_empty then nil
- when nil then raise Error::ChannelClosed.new(@id, *@closed)
+ when nil then raise Error::Closed.new(@id, *@closed)
end
end
# Publishes a message to an exchange
# @param body [String] The body, can be a string or a byte array
# @param exchange [String] Name of the exchange to publish to
# @param routing_key [String] The routing key that the exchange might use to route the message to a queue
# @param properties [Properties]
+ # @option properties [Boolean] mandatory The message will be returned if the message can't be routed to a queue
+ # @option properties [Boolean] persistent Same as delivery_mode: 2
# @option properties [String] content_type Content type of the message body
# @option properties [String] content_encoding Content encoding of the body
# @option properties [Hash<String, Object>] headers Custom headers
# @option properties [Integer] delivery_mode 2 for persisted message, transient messages for all other values
# @option properties [Integer] priority A priority of the message (between 0 and 255)
@@ -251,19 +254,19 @@
# @option properties [String] type Can indicate what kind of message this is
# @option properties [String] user_id Can be used to verify that this is the user that published the message
# @option properties [String] app_id Can be used to indicates which app that generated the message
# @return [nil]
def basic_publish(body, exchange, routing_key, **properties)
- frame_max = @connection.frame_max - 8
+ body_max = @connection.frame_max - 8
id = @id
mandatory = properties.delete(:mandatory) || false
case properties.delete(:persistent)
when true then properties[:delivery_mode] = 2
when false then properties[:delivery_mode] = 1
end
- if body.bytesize.between?(1, frame_max)
+ if body.bytesize.between?(1, body_max)
write_bytes FrameBytes.basic_publish(id, exchange, routing_key, mandatory),
FrameBytes.header(id, body.bytesize, properties),
FrameBytes.body(id, body)
@unconfirmed.push @confirm += 1 if @confirm
return
@@ -271,36 +274,39 @@
write_bytes FrameBytes.basic_publish(id, exchange, routing_key, mandatory),
FrameBytes.header(id, body.bytesize, properties)
pos = 0
while pos < body.bytesize # split body into multiple frame_max frames
- len = [frame_max, body.bytesize - pos].min
+ len = [body_max, body.bytesize - pos].min
body_part = body.byteslice(pos, len)
write_bytes FrameBytes.body(id, body_part)
pos += len
end
@unconfirmed.push @confirm += 1 if @confirm
nil
end
# Publish a message and block until the message has confirmed it has received it
# @param (see #basic_publish)
+ # @option (see #basic_publish)
# @return [Boolean] True if the message was successfully published
+ # @raise (see #basic_publish)
def basic_publish_confirm(body, exchange, routing_key, **properties)
confirm_select(no_wait: true)
basic_publish(body, exchange, routing_key, **properties)
wait_for_confirms
end
# Consume messages from a queue
# @param queue [String] Name of the queue to subscribe to
- # @param tag [String] Custom consumer tag, will be auto assigned by the server if empty
+ # @param tag [String] Custom consumer tag, will be auto assigned by the broker if empty.
+ # Has to be uniqe among this channel's consumers only
# @param no_ack [Boolean] When false messages have to be manually acknowledged (or rejected)
# @param exclusive [Boolean] When true only a single consumer can consume from the queue at a time
- # @param arguments [Hash] Custom arguments to the consumer
+ # @param arguments [Hash] Custom arguments for the consumer
# @param worker_threads [Integer] Number of threads processing messages,
- # 0 means that the thread calling this method will be blocked
+ # 0 means that the thread calling this method will process the messages and thus this method will block
# @yield [Message] Delivered message from the queue
# @return [Array<(String, Array<Thread>)>] Returns consumer_tag and an array of worker threads
# @return [nil] When `worker_threads` is 0 the method will return when the consumer is cancelled
def basic_consume(queue, tag: "", no_ack: true, exclusive: false, arguments: {}, worker_threads: 1)
write_bytes FrameBytes.basic_consume(@id, queue, tag, no_ack, exclusive, arguments)
@@ -323,11 +329,11 @@
end
end
# Cancel/abort/stop a consumer
# @param consumer_tag [String] Tag of the consumer to cancel
- # @param no_wait [Boolean] Will wait for a confirmation from the server that the consumer is cancelled
+ # @param no_wait [Boolean] Will wait for a confirmation from the broker that the consumer is cancelled
# @return [nil]
def basic_cancel(consumer_tag, no_wait: false)
consumer = @consumers.fetch(consumer_tag)
return if consumer.closed?
@@ -375,23 +381,23 @@
nil
end
# Recover all the unacknowledge messages
# @param requeue [Boolean] If false the currently unack:ed messages will be deliviered to this consumer again,
- # if false to any consumer
+ # if true to any consumer
# @return [nil]
def basic_recover(requeue: false)
write_bytes FrameBytes.basic_recover(@id, requeue: requeue)
expect :basic_recover_ok
nil
end
# @!endgroup
# @!group Confirm
- # Put the channel in confirm mode, each published message will then be confirmed by the server
- # @param no_wait [Boolean] If false the method will block until the server has confirmed the request
+ # Put the channel in confirm mode, each published message will then be confirmed by the broker
+ # @param no_wait [Boolean] If false the method will block until the broker has confirmed the request
# @return [nil]
def confirm_select(no_wait: false)
return if @confirm
write_bytes FrameBytes.confirm_select(@id, no_wait)
@@ -406,15 +412,15 @@
return true if @unconfirmed.empty?
case @unconfirmed_empty.pop
when true then true
when false then false
- else raise Error::ChannelClosed.new(@id, *@closed)
+ else raise Error::Closed.new(@id, *@closed)
end
end
- # Called by Connection when received ack/nack from server
+ # Called by Connection when received ack/nack from broker
# @api private
def confirm(args)
ack_or_nack, delivery_tag, multiple = *args
loop do
tag = @unconfirmed.pop(true)
@@ -525,17 +531,17 @@
ensure
@next_msg = @next_body = @next_body_size = nil
end
def write_bytes(*bytes)
- raise Error::ChannelClosed.new(@id, *@closed) if @closed
+ raise Error::Closed.new(@id, *@closed) if @closed
@connection.write_bytes(*bytes)
end
def expect(expected_frame_type)
frame_type, *args = @replies.pop
- raise Error::ChannelClosed.new(@id, *@closed) if frame_type.nil?
+ raise Error::Closed.new(@id, *@closed) if frame_type.nil?
raise Error::UnexpectedFrame.new(expected_frame_type, frame_type) unless frame_type == expected_frame_type
args
end
end