lib/bunny/channel.rb in bunny-0.9.0.pre2 vs lib/bunny/channel.rb in bunny-0.9.0.pre3
- old
+ new
@@ -1,7 +1,7 @@
require "thread"
-require "amq/int_allocator"
+require "set"
require "bunny/consumer_work_pool"
require "bunny/exchange"
require "bunny/queue"
@@ -16,10 +16,11 @@
#
# API
#
attr_accessor :id, :connection, :status, :work_pool
+ attr_reader :next_publish_seq_no
def initialize(connection = nil, id = nil, work_pool = ConsumerWorkPool.new(1))
@connection = connection
@id = id || @connection.next_channel_id
@@ -31,17 +32,21 @@
@exchanges = Hash.new
@consumers = Hash.new
@work_pool = work_pool
# synchronizes frameset delivery. MK.
- @mutex = Mutex.new
- @consumer_mutex = Mutex.new
+ @publishing_mutex = Mutex.new
+ @consumer_mutex = Mutex.new
- @continuations = ::Queue.new
- end
+ @unconfirmed_set_mutex = Mutex.new
+ @continuations = ::Queue.new
+ @confirms_continuations = ::Queue.new
+ @next_publish_seq_no = 0
+ end
+
def open
@connection.open_channel(self)
@status = :open
self
@@ -144,10 +149,13 @@
def on_error(&block)
@default_error_handler = block
end
+ def using_publisher_confirmations?
+ @next_publish_seq_no > 0
+ end
#
# Lower-level API, exposes protocol operations as they are defined in the protocol,
# without any OO sugar on top, by design.
#
@@ -163,12 +171,25 @@
exchange
end
meta = { :priority => 0, :delivery_mode => 2, :content_type => "application/octet-stream" }.
merge(opts)
- @connection.send_frameset(AMQ::Protocol::Basic::Publish.encode(@id, payload, meta, exchange_name, routing_key, meta[:mandatory], false, @connection.frame_max), self)
+ if @next_publish_seq_no > 0
+ @unconfirmed_set.add(@next_publish_seq_no)
+ @next_publish_seq_no += 1
+ end
+
+ @connection.send_frameset(AMQ::Protocol::Basic::Publish.encode(@id,
+ payload,
+ meta,
+ exchange_name,
+ routing_key,
+ meta[:mandatory],
+ false,
+ @connection.frame_max), self)
+
self
end
def basic_get(queue, opts = {:ack => true})
raise_if_no_longer_open!
@@ -220,11 +241,14 @@
nil
end
def basic_nack(delivery_tag, requeue, multiple = false)
raise_if_no_longer_open!
- @connection.send_frame(AMQ::Protocol::Basic::Nack.encode(@id, delivery_tag, requeue, multiple))
+ @connection.send_frame(AMQ::Protocol::Basic::Nack.encode(@id,
+ delivery_tag,
+ requeue,
+ multiple))
nil
end
def basic_consume(queue, consumer_tag = generate_consumer_tag, no_ack = false, exclusive = false, arguments = nil, &block)
@@ -235,25 +259,58 @@
queue.name
else
queue
end
- @connection.send_frame(AMQ::Protocol::Basic::Consume.encode(@id, queue_name, consumer_tag, false, no_ack, exclusive, false, arguments))
+ @connection.send_frame(AMQ::Protocol::Basic::Consume.encode(@id,
+ queue_name,
+ consumer_tag,
+ false,
+ no_ack,
+ exclusive,
+ false,
+ arguments))
Bunny::Timer.timeout(1, ClientTimeout) do
@last_basic_consume_ok = @continuations.pop
end
@consumer_mutex.synchronize do
- c = Consumer.new(self, queue, consumer_tag, no_ack, exclusive, arguments)
+ # make sure to use consumer tag from basic.consume-ok in case it was
+ # server-generated
+ c = Consumer.new(self, queue, @last_basic_consume_ok.consumer_tag, no_ack, exclusive, arguments)
c.on_delivery(&block) if block
-
@consumers[@last_basic_consume_ok.consumer_tag] = c
end
@last_basic_consume_ok
end
+ def basic_consume_with(consumer)
+ raise_if_no_longer_open!
+ maybe_start_consumer_work_pool!
+
+ @connection.send_frame(AMQ::Protocol::Basic::Consume.encode(@id,
+ consumer.queue_name,
+ consumer.consumer_tag,
+ false,
+ consumer.no_ack,
+ consumer.exclusive,
+ false,
+ consumer.arguments))
+ Bunny::Timer.timeout(1, ClientTimeout) do
+ @last_basic_consume_ok = @continuations.pop
+ end
+
+ @consumer_mutex.synchronize do
+ # update the tag in case it was server-generated
+ consumer.consumer_tag = @last_basic_consume_ok.consumer_tag
+ @consumers[@last_basic_consume_ok.consumer_tag] = consumer
+ end
+
+ @last_basic_consume_ok
+ end
+
def basic_cancel(consumer_tag)
@connection.send_frame(AMQ::Protocol::Basic::Cancel.encode(@id, consumer_tag, false))
Bunny::Timer.timeout(1, ClientTimeout) do
@last_basic_cancel_ok = @continuations.pop
@@ -266,22 +323,33 @@
# queue.*
def queue_declare(name, opts = {})
raise_if_no_longer_open!
- @connection.send_frame(AMQ::Protocol::Queue::Declare.encode(@id, name, opts.fetch(:passive, false), opts.fetch(:durable, false), opts.fetch(:exclusive, false), opts.fetch(:auto_delete, false), false, opts[:arguments]))
+ @connection.send_frame(AMQ::Protocol::Queue::Declare.encode(@id,
+ name,
+ opts.fetch(:passive, false),
+ opts.fetch(:durable, false),
+ opts.fetch(:exclusive, false),
+ opts.fetch(:auto_delete, false),
+ false,
+ opts[:arguments]))
@last_queue_declare_ok = @continuations.pop
raise_if_continuation_resulted_in_a_channel_error!
@last_queue_declare_ok
end
def queue_delete(name, opts = {})
raise_if_no_longer_open!
- @connection.send_frame(AMQ::Protocol::Queue::Delete.encode(@id, name, opts[:if_unused], opts[:if_empty], false))
+ @connection.send_frame(AMQ::Protocol::Queue::Delete.encode(@id,
+ name,
+ opts[:if_unused],
+ opts[:if_empty],
+ false))
Bunny::Timer.timeout(1, ClientTimeout) do
@last_queue_delete_ok = @continuations.pop
end
raise_if_continuation_resulted_in_a_channel_error!
@@ -308,11 +376,16 @@
exchange.name
else
exchange
end
- @connection.send_frame(AMQ::Protocol::Queue::Bind.encode(@id, name, exchange_name, opts[:routing_key], false, opts[:arguments]))
+ @connection.send_frame(AMQ::Protocol::Queue::Bind.encode(@id,
+ name,
+ exchange_name,
+ opts[:routing_key],
+ false,
+ opts[:arguments]))
Bunny::Timer.timeout(1, ClientTimeout) do
@last_queue_bind_ok = @continuations.pop
end
raise_if_continuation_resulted_in_a_channel_error!
@@ -326,11 +399,15 @@
exchange.name
else
exchange
end
- @connection.send_frame(AMQ::Protocol::Queue::Unbind.encode(@id, name, exchange_name, opts[:routing_key], opts[:arguments]))
+ @connection.send_frame(AMQ::Protocol::Queue::Unbind.encode(@id,
+ name,
+ exchange_name,
+ opts[:routing_key],
+ opts[:arguments]))
Bunny::Timer.timeout(1, ClientTimeout) do
@last_queue_unbind_ok = @continuations.pop
end
raise_if_continuation_resulted_in_a_channel_error!
@@ -341,11 +418,19 @@
# exchange.*
def exchange_declare(name, type, opts = {})
raise_if_no_longer_open!
- @connection.send_frame(AMQ::Protocol::Exchange::Declare.encode(@id, name, type.to_s, opts.fetch(:passive, false), opts.fetch(:durable, false), opts.fetch(:auto_delete, false), false, false, opts[:arguments]))
+ @connection.send_frame(AMQ::Protocol::Exchange::Declare.encode(@id,
+ name,
+ type.to_s,
+ opts.fetch(:passive, false),
+ opts.fetch(:durable, false),
+ opts.fetch(:auto_delete, false),
+ false,
+ false,
+ opts[:arguments]))
Bunny::Timer.timeout(1, ClientTimeout) do
@last_exchange_declare_ok = @continuations.pop
end
raise_if_continuation_resulted_in_a_channel_error!
@@ -353,11 +438,14 @@
end
def exchange_delete(name, opts = {})
raise_if_no_longer_open!
- @connection.send_frame(AMQ::Protocol::Exchange::Delete.encode(@id, name, opts[:if_unused], false))
+ @connection.send_frame(AMQ::Protocol::Exchange::Delete.encode(@id,
+ name,
+ opts[:if_unused],
+ false))
Bunny::Timer.timeout(1, ClientTimeout) do
@last_exchange_delete_ok = @continuations.pop
end
raise_if_continuation_resulted_in_a_channel_error!
@@ -366,22 +454,27 @@
def exchange_bind(source, destination, opts = {})
raise_if_no_longer_open!
source_name = if source.respond_to?(:name)
- source.name
- else
- source
- end
+ source.name
+ else
+ source
+ end
destination_name = if destination.respond_to?(:name)
- destination.name
- else
- destination
- end
+ destination.name
+ else
+ destination
+ end
- @connection.send_frame(AMQ::Protocol::Exchange::Bind.encode(@id, destination_name, source_name, opts[:routing_key], false, opts[:arguments]))
+ @connection.send_frame(AMQ::Protocol::Exchange::Bind.encode(@id,
+ destination_name,
+ source_name,
+ opts[:routing_key],
+ false,
+ opts[:arguments]))
Bunny::Timer.timeout(1, ClientTimeout) do
@last_exchange_bind_ok = @continuations.pop
end
raise_if_continuation_resulted_in_a_channel_error!
@@ -390,22 +483,27 @@
def exchange_unbind(source, destination, opts = {})
raise_if_no_longer_open!
source_name = if source.respond_to?(:name)
- source.name
- else
- source
- end
+ source.name
+ else
+ source
+ end
destination_name = if destination.respond_to?(:name)
- destination.name
- else
- destination
- end
+ destination.name
+ else
+ destination
+ end
- @connection.send_frame(AMQ::Protocol::Exchange::Unbind.encode(@id, destination_name, source_name, opts[:routing_key], false, opts[:arguments]))
+ @connection.send_frame(AMQ::Protocol::Exchange::Unbind.encode(@id,
+ destination_name,
+ source_name,
+ opts[:routing_key],
+ false,
+ opts[:arguments]))
Bunny::Timer.timeout(1, ClientTimeout) do
@last_exchange_unbind_ok = @continuations.pop
end
raise_if_continuation_resulted_in_a_channel_error!
@@ -467,20 +565,31 @@
# confirm.*
def confirm_select
raise_if_no_longer_open!
+ if @next_publish_seq_no == 0
+ @confirms_continuations = []
+ @unconfirmed_set = Set.new
+ @next_publish_seq_no = 1
+ end
+
@connection.send_frame(AMQ::Protocol::Confirm::Select.encode(@id, false))
Bunny::Timer.timeout(1, ClientTimeout) do
@last_confirm_select_ok = @continuations.pop
end
raise_if_continuation_resulted_in_a_channel_error!
-
@last_confirm_select_ok
end
+ def wait_for_confirms
+ @only_acks_received = true
+ @confirms_continuations.pop
+ @only_acks_received
+ end
+
#
# Implementation
#
def handle_method(method)
@@ -510,19 +619,31 @@
@continuations.push(method)
when AMQ::Protocol::Channel::FlowOk then
@continuations.push(method)
when AMQ::Protocol::Basic::ConsumeOk then
@continuations.push(method)
+ when AMQ::Protocol::Basic::Cancel then
+ if consumer = @consumers[method.consumer_tag]
+ consumer.handle_cancellation(method)
+ end
+
+ @consumers.delete(method.consumer_tag)
when AMQ::Protocol::Basic::CancelOk then
@continuations.push(method)
@consumers.delete(method.consumer_tag)
when AMQ::Protocol::Tx::SelectOk, AMQ::Protocol::Tx::CommitOk, AMQ::Protocol::Tx::RollbackOk then
@continuations.push(method)
when AMQ::Protocol::Tx::SelectOk then
@continuations.push(method)
when AMQ::Protocol::Confirm::SelectOk then
@continuations.push(method)
+ when AMQ::Protocol::Basic::Ack then
+ # TODO: implement confirm listeners
+ handle_ack_or_nack(method.delivery_tag, method.multiple, false)
+ when AMQ::Protocol::Basic::Nack then
+ # TODO: implement confirm listeners
+ handle_ack_or_nack(method.delivery_tag, method.multiple, true)
when AMQ::Protocol::Channel::Close then
# puts "Exception on channel #{@id}: #{method.reply_code} #{method.reply_text}"
closed!
@connection.send_frame(AMQ::Protocol::Channel::CloseOk.encode(@id))
@@ -560,10 +681,24 @@
else
# TODO: log a warning
end
end
+ def handle_ack_or_nack(delivery_tag, multiple, nack)
+ if multiple
+ @unconfirmed_set.delete_if { |i| i < delivery_tag }
+ else
+ @unconfirmed_set.delete(delivery_tag)
+ end
+
+ @unconfirmed_set_mutex.synchronize do
+ @only_acks_received = (@only_acks_received && !nack)
+
+ @confirms_continuations.push(true) if @unconfirmed_set.empty?
+ end
+ end
+
# Starts consumer work pool. Lazily called by #basic_consume to avoid creating new threads
# that won't do any real work for channels that do not register consumers (e.g. only used for
# publishing). MK.
def maybe_start_consumer_work_pool!
@work_pool.start unless @work_pool.started?
@@ -574,10 +709,10 @@
end
# Synchronizes given block using this channel's mutex.
# @api public
def synchronize(&block)
- @mutex.synchronize(&block)
+ @publishing_mutex.synchronize(&block)
end
def register_queue(queue)
@queues[queue.name] = queue
end