lib/bunny/channel.rb in bunny-1.2.0 vs lib/bunny/channel.rb in bunny-1.2.1
- old
+ new
@@ -155,10 +155,13 @@
# @return [Set<Integer>] Set of nacked message indexes that have been nacked
attr_reader :nacked_set
# @return [Hash<String, Bunny::Consumer>] Consumer instances declared on this channel
attr_reader :consumers
+ # @return [Integer] active basic.qos prefetch value
+ attr_reader :prefetch_count
+
DEFAULT_CONTENT_TYPE = "application/octet-stream".freeze
SHORTSTR_LIMIT = 255
# @param [Bunny::Session] connection AMQP 0.9.1 connection
# @param [Integer] id Channel id, pass nil to make Bunny automatically allocate it
@@ -410,12 +413,12 @@
#
# @param [Integer] prefetch_count Prefetch (QoS setting) for this channel
# @see http://rubybunny.info/articles/exchanges.html Exchanges and Publishing guide
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @api public
- def prefetch(prefetch_count)
- self.basic_qos(prefetch_count, false)
+ def prefetch(count)
+ self.basic_qos(count, false)
end
# Flow control. When set to false, RabbitMQ will stop delivering messages on this
# channel.
#
@@ -534,12 +537,14 @@
opts[:delivery_mode] ||= mode
opts[:content_type] ||= DEFAULT_CONTENT_TYPE
opts[:priority] ||= 0
if @next_publish_seq_no > 0
- @unconfirmed_set.add(@next_publish_seq_no)
- @next_publish_seq_no += 1
+ @unconfirmed_set_mutex.synchronize do
+ @unconfirmed_set.add(@next_publish_seq_no)
+ @next_publish_seq_no += 1
+ end
end
frames = AMQ::Protocol::Basic::Publish.encode(@id,
payload,
opts,
@@ -602,22 +607,22 @@
# @param [Boolean] global (false) Ignored, as it is not supported by RabbitMQ
# @return [AMQ::Protocol::Basic::QosOk] RabbitMQ response
# @see Bunny::Channel#prefetch
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @api public
- def basic_qos(prefetch_count, global = false)
- raise ArgumentError.new("prefetch count must be a positive integer, given: #{prefetch_count}") if prefetch_count < 0
+ def basic_qos(count, global = false)
+ raise ArgumentError.new("prefetch count must be a positive integer, given: #{prefetch_count}") if count < 0
raise_if_no_longer_open!
- @connection.send_frame(AMQ::Protocol::Basic::Qos.encode(@id, 0, prefetch_count, global))
+ @connection.send_frame(AMQ::Protocol::Basic::Qos.encode(@id, 0, count, global))
Bunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
@last_basic_qos_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
- @prefetch_count = prefetch_count
+ @prefetch_count = count
@last_basic_qos_ok
end
# Redeliver unacknowledged messages
@@ -1319,10 +1324,15 @@
raise_if_continuation_resulted_in_a_channel_error!
@last_tx_rollback_ok
end
+ # @return [Boolean] true if this channel has transactions enabled
+ def using_tx?
+ !!@tx_mode
+ end
+
# @endgroup
# @group Publisher Confirms (confirm.*)
@@ -1330,10 +1340,11 @@
# @return [Boolean] true if this channel has Publisher Confirms enabled, false otherwise
# @api public
def using_publisher_confirmations?
@next_publish_seq_no > 0
end
+ alias using_publisher_confirms? using_publisher_confirmations?
# Enables publisher confirms for the channel.
# @return [AMQ::Protocol::Confirm::SelectOk] RabbitMQ response
# @see #wait_for_confirms
# @see #unconfirmed_set
@@ -1679,26 +1690,26 @@
end
end
# @private
def handle_ack_or_nack(delivery_tag, multiple, nack)
- if nack
- cloned_set = @unconfirmed_set.clone
+ @unconfirmed_set_mutex.synchronize do
+ if nack
+ cloned_set = @unconfirmed_set.clone
+ if multiple
+ cloned_set.keep_if { |i| i <= delivery_tag }
+ @nacked_set.merge(cloned_set)
+ else
+ @nacked_set.add(delivery_tag)
+ end
+ end
+
if multiple
- cloned_set.keep_if { |i| i <= delivery_tag }
- @nacked_set.merge(cloned_set)
+ @unconfirmed_set.delete_if { |i| i <= delivery_tag }
else
- @nacked_set.add(delivery_tag)
+ @unconfirmed_set.delete(delivery_tag)
end
- end
- if multiple
- @unconfirmed_set.delete_if { |i| i <= delivery_tag }
- else
- @unconfirmed_set.delete(delivery_tag)
- end
-
- @unconfirmed_set_mutex.synchronize do
@only_acks_received = (@only_acks_received && !nack)
@confirms_continuations.push(true) if @unconfirmed_set.empty?
@confirms_callback.call(delivery_tag, multiple, nack) if @confirms_callback
end