lib/bunny/channel.rb in bunny-1.2.0 vs lib/bunny/channel.rb in bunny-1.2.1

- old
+ new

@@ -155,10 +155,13 @@ # @return [Set<Integer>] Set of nacked message indexes that have been nacked attr_reader :nacked_set # @return [Hash<String, Bunny::Consumer>] Consumer instances declared on this channel attr_reader :consumers + # @return [Integer] active basic.qos prefetch value + attr_reader :prefetch_count + DEFAULT_CONTENT_TYPE = "application/octet-stream".freeze SHORTSTR_LIMIT = 255 # @param [Bunny::Session] connection AMQP 0.9.1 connection # @param [Integer] id Channel id, pass nil to make Bunny automatically allocate it @@ -410,12 +413,12 @@ # # @param [Integer] prefetch_count Prefetch (QoS setting) for this channel # @see http://rubybunny.info/articles/exchanges.html Exchanges and Publishing guide # @see http://rubybunny.info/articles/queues.html Queues and Consumers guide # @api public - def prefetch(prefetch_count) - self.basic_qos(prefetch_count, false) + def prefetch(count) + self.basic_qos(count, false) end # Flow control. When set to false, RabbitMQ will stop delivering messages on this # channel. # @@ -534,12 +537,14 @@ opts[:delivery_mode] ||= mode opts[:content_type] ||= DEFAULT_CONTENT_TYPE opts[:priority] ||= 0 if @next_publish_seq_no > 0 - @unconfirmed_set.add(@next_publish_seq_no) - @next_publish_seq_no += 1 + @unconfirmed_set_mutex.synchronize do + @unconfirmed_set.add(@next_publish_seq_no) + @next_publish_seq_no += 1 + end end frames = AMQ::Protocol::Basic::Publish.encode(@id, payload, opts, @@ -602,22 +607,22 @@ # @param [Boolean] global (false) Ignored, as it is not supported by RabbitMQ # @return [AMQ::Protocol::Basic::QosOk] RabbitMQ response # @see Bunny::Channel#prefetch # @see http://rubybunny.info/articles/queues.html Queues and Consumers guide # @api public - def basic_qos(prefetch_count, global = false) - raise ArgumentError.new("prefetch count must be a positive integer, given: #{prefetch_count}") if prefetch_count < 0 + def basic_qos(count, global = false) + raise ArgumentError.new("prefetch count must be a positive integer, given: #{prefetch_count}") if count < 0 raise_if_no_longer_open! - @connection.send_frame(AMQ::Protocol::Basic::Qos.encode(@id, 0, prefetch_count, global)) + @connection.send_frame(AMQ::Protocol::Basic::Qos.encode(@id, 0, count, global)) Bunny::Timeout.timeout(read_write_timeout, ClientTimeout) do @last_basic_qos_ok = wait_on_continuations end raise_if_continuation_resulted_in_a_channel_error! - @prefetch_count = prefetch_count + @prefetch_count = count @last_basic_qos_ok end # Redeliver unacknowledged messages @@ -1319,10 +1324,15 @@ raise_if_continuation_resulted_in_a_channel_error! @last_tx_rollback_ok end + # @return [Boolean] true if this channel has transactions enabled + def using_tx? + !!@tx_mode + end + # @endgroup # @group Publisher Confirms (confirm.*) @@ -1330,10 +1340,11 @@ # @return [Boolean] true if this channel has Publisher Confirms enabled, false otherwise # @api public def using_publisher_confirmations? @next_publish_seq_no > 0 end + alias using_publisher_confirms? using_publisher_confirmations? # Enables publisher confirms for the channel. # @return [AMQ::Protocol::Confirm::SelectOk] RabbitMQ response # @see #wait_for_confirms # @see #unconfirmed_set @@ -1679,26 +1690,26 @@ end end # @private def handle_ack_or_nack(delivery_tag, multiple, nack) - if nack - cloned_set = @unconfirmed_set.clone + @unconfirmed_set_mutex.synchronize do + if nack + cloned_set = @unconfirmed_set.clone + if multiple + cloned_set.keep_if { |i| i <= delivery_tag } + @nacked_set.merge(cloned_set) + else + @nacked_set.add(delivery_tag) + end + end + if multiple - cloned_set.keep_if { |i| i <= delivery_tag } - @nacked_set.merge(cloned_set) + @unconfirmed_set.delete_if { |i| i <= delivery_tag } else - @nacked_set.add(delivery_tag) + @unconfirmed_set.delete(delivery_tag) end - end - if multiple - @unconfirmed_set.delete_if { |i| i <= delivery_tag } - else - @unconfirmed_set.delete(delivery_tag) - end - - @unconfirmed_set_mutex.synchronize do @only_acks_received = (@only_acks_received && !nack) @confirms_continuations.push(true) if @unconfirmed_set.empty? @confirms_callback.call(delivery_tag, multiple, nack) if @confirms_callback end