lib/amqp/client/channel.rb in amqp-client-1.0.1 vs lib/amqp/client/channel.rb in amqp-client-1.0.2

- old
+ new

@@ -53,33 +53,34 @@ # @return [nil] def close(reason: "", code: 200) return if @closed write_bytes FrameBytes.channel_close(@id, reason, code) - @closed = [code, reason] + @closed = [:channel, code, reason] expect :channel_close_ok @replies.close @basic_gets.close @unconfirmed_empty.close @consumers.each_value(&:close) nil end - # Called when channel is closed by server + # Called when channel is closed by broker + # @param level [Symbol] :connection or :channel # @return [nil] # @api private - def closed!(code, reason, classid, methodid) - @closed = [code, reason, classid, methodid] + def closed!(level, code, reason, classid, methodid) + @closed = [level, code, reason, classid, methodid] @replies.close @basic_gets.close @unconfirmed_empty.close @consumers.each_value(&:close) nil end # Handle returned messages in this block. If not set the message will just be logged to STDERR - # @yield [ReturnMessage] Messages returned by the server when a publish has failed + # @yield [ReturnMessage] Messages returned by the broker when a publish has failed # @return nil def on_return(&block) @on_return = block nil end @@ -103,11 +104,11 @@ end # Delete an exchange # @param name [String] Name of the exchange # @param if_unused [Boolean] If true raise an exception if queues/exchanges is bound to this exchange - # @param no_wait [Boolean] If true don't wait for a server confirmation + # @param no_wait [Boolean] If true don't wait for a broker confirmation # @return [nil] def exchange_delete(name, if_unused: false, no_wait: false) write_bytes FrameBytes.exchange_delete(@id, name, if_unused, no_wait) expect :exchange_delete_ok unless no_wait nil @@ -172,11 +173,11 @@ # Delete a queue # @param name [String] Name of the queue # @param if_unused [Boolean] Only delete if the queue doesn't have consumers, raises a ChannelClosed error otherwise # @param if_empty [Boolean] Only delete if the queue is empty, raises a ChannelClosed error otherwise - # @param no_wait [Boolean] Don't wait for a server confirmation if true + # @param no_wait [Boolean] Don't wait for a broker confirmation if true # @return [Integer] Number of messages in queue when deleted # @return [nil] If no_wait was set true def queue_delete(name, if_unused: false, if_empty: false, no_wait: false) write_bytes FrameBytes.queue_delete(@id, name, if_unused, if_empty, no_wait) message_count, = expect :queue_delete unless no_wait @@ -195,11 +196,11 @@ nil end # Purge a queue # @param name [String] Name of the queue - # @param no_wait [Boolean] Don't wait for a server confirmation if true + # @param no_wait [Boolean] Don't wait for a broker confirmation if true # @return [nil] def queue_purge(name, no_wait: false) write_bytes FrameBytes.queue_purge(@id, name, no_wait) expect :queue_purge_ok unless no_wait nil @@ -227,19 +228,21 @@ def basic_get(queue_name, no_ack: true) write_bytes FrameBytes.basic_get(@id, queue_name, no_ack) case (msg = @basic_gets.pop) when Message then msg when :basic_get_empty then nil - when nil then raise Error::ChannelClosed.new(@id, *@closed) + when nil then raise Error::Closed.new(@id, *@closed) end end # Publishes a message to an exchange # @param body [String] The body, can be a string or a byte array # @param exchange [String] Name of the exchange to publish to # @param routing_key [String] The routing key that the exchange might use to route the message to a queue # @param properties [Properties] + # @option properties [Boolean] mandatory The message will be returned if the message can't be routed to a queue + # @option properties [Boolean] persistent Same as delivery_mode: 2 # @option properties [String] content_type Content type of the message body # @option properties [String] content_encoding Content encoding of the body # @option properties [Hash<String, Object>] headers Custom headers # @option properties [Integer] delivery_mode 2 for persisted message, transient messages for all other values # @option properties [Integer] priority A priority of the message (between 0 and 255) @@ -251,19 +254,19 @@ # @option properties [String] type Can indicate what kind of message this is # @option properties [String] user_id Can be used to verify that this is the user that published the message # @option properties [String] app_id Can be used to indicates which app that generated the message # @return [nil] def basic_publish(body, exchange, routing_key, **properties) - frame_max = @connection.frame_max - 8 + body_max = @connection.frame_max - 8 id = @id mandatory = properties.delete(:mandatory) || false case properties.delete(:persistent) when true then properties[:delivery_mode] = 2 when false then properties[:delivery_mode] = 1 end - if body.bytesize.between?(1, frame_max) + if body.bytesize.between?(1, body_max) write_bytes FrameBytes.basic_publish(id, exchange, routing_key, mandatory), FrameBytes.header(id, body.bytesize, properties), FrameBytes.body(id, body) @unconfirmed.push @confirm += 1 if @confirm return @@ -271,36 +274,39 @@ write_bytes FrameBytes.basic_publish(id, exchange, routing_key, mandatory), FrameBytes.header(id, body.bytesize, properties) pos = 0 while pos < body.bytesize # split body into multiple frame_max frames - len = [frame_max, body.bytesize - pos].min + len = [body_max, body.bytesize - pos].min body_part = body.byteslice(pos, len) write_bytes FrameBytes.body(id, body_part) pos += len end @unconfirmed.push @confirm += 1 if @confirm nil end # Publish a message and block until the message has confirmed it has received it # @param (see #basic_publish) + # @option (see #basic_publish) # @return [Boolean] True if the message was successfully published + # @raise (see #basic_publish) def basic_publish_confirm(body, exchange, routing_key, **properties) confirm_select(no_wait: true) basic_publish(body, exchange, routing_key, **properties) wait_for_confirms end # Consume messages from a queue # @param queue [String] Name of the queue to subscribe to - # @param tag [String] Custom consumer tag, will be auto assigned by the server if empty + # @param tag [String] Custom consumer tag, will be auto assigned by the broker if empty. + # Has to be uniqe among this channel's consumers only # @param no_ack [Boolean] When false messages have to be manually acknowledged (or rejected) # @param exclusive [Boolean] When true only a single consumer can consume from the queue at a time - # @param arguments [Hash] Custom arguments to the consumer + # @param arguments [Hash] Custom arguments for the consumer # @param worker_threads [Integer] Number of threads processing messages, - # 0 means that the thread calling this method will be blocked + # 0 means that the thread calling this method will process the messages and thus this method will block # @yield [Message] Delivered message from the queue # @return [Array<(String, Array<Thread>)>] Returns consumer_tag and an array of worker threads # @return [nil] When `worker_threads` is 0 the method will return when the consumer is cancelled def basic_consume(queue, tag: "", no_ack: true, exclusive: false, arguments: {}, worker_threads: 1) write_bytes FrameBytes.basic_consume(@id, queue, tag, no_ack, exclusive, arguments) @@ -323,11 +329,11 @@ end end # Cancel/abort/stop a consumer # @param consumer_tag [String] Tag of the consumer to cancel - # @param no_wait [Boolean] Will wait for a confirmation from the server that the consumer is cancelled + # @param no_wait [Boolean] Will wait for a confirmation from the broker that the consumer is cancelled # @return [nil] def basic_cancel(consumer_tag, no_wait: false) consumer = @consumers.fetch(consumer_tag) return if consumer.closed? @@ -375,23 +381,23 @@ nil end # Recover all the unacknowledge messages # @param requeue [Boolean] If false the currently unack:ed messages will be deliviered to this consumer again, - # if false to any consumer + # if true to any consumer # @return [nil] def basic_recover(requeue: false) write_bytes FrameBytes.basic_recover(@id, requeue: requeue) expect :basic_recover_ok nil end # @!endgroup # @!group Confirm - # Put the channel in confirm mode, each published message will then be confirmed by the server - # @param no_wait [Boolean] If false the method will block until the server has confirmed the request + # Put the channel in confirm mode, each published message will then be confirmed by the broker + # @param no_wait [Boolean] If false the method will block until the broker has confirmed the request # @return [nil] def confirm_select(no_wait: false) return if @confirm write_bytes FrameBytes.confirm_select(@id, no_wait) @@ -406,15 +412,15 @@ return true if @unconfirmed.empty? case @unconfirmed_empty.pop when true then true when false then false - else raise Error::ChannelClosed.new(@id, *@closed) + else raise Error::Closed.new(@id, *@closed) end end - # Called by Connection when received ack/nack from server + # Called by Connection when received ack/nack from broker # @api private def confirm(args) ack_or_nack, delivery_tag, multiple = *args loop do tag = @unconfirmed.pop(true) @@ -525,17 +531,17 @@ ensure @next_msg = @next_body = @next_body_size = nil end def write_bytes(*bytes) - raise Error::ChannelClosed.new(@id, *@closed) if @closed + raise Error::Closed.new(@id, *@closed) if @closed @connection.write_bytes(*bytes) end def expect(expected_frame_type) frame_type, *args = @replies.pop - raise Error::ChannelClosed.new(@id, *@closed) if frame_type.nil? + raise Error::Closed.new(@id, *@closed) if frame_type.nil? raise Error::UnexpectedFrame.new(expected_frame_type, frame_type) unless frame_type == expected_frame_type args end end