lib/bunny/channel.rb in bunny-0.9.0.pre2 vs lib/bunny/channel.rb in bunny-0.9.0.pre3

- old
+ new

@@ -1,7 +1,7 @@ require "thread" -require "amq/int_allocator" +require "set" require "bunny/consumer_work_pool" require "bunny/exchange" require "bunny/queue" @@ -16,10 +16,11 @@ # # API # attr_accessor :id, :connection, :status, :work_pool + attr_reader :next_publish_seq_no def initialize(connection = nil, id = nil, work_pool = ConsumerWorkPool.new(1)) @connection = connection @id = id || @connection.next_channel_id @@ -31,17 +32,21 @@ @exchanges = Hash.new @consumers = Hash.new @work_pool = work_pool # synchronizes frameset delivery. MK. - @mutex = Mutex.new - @consumer_mutex = Mutex.new + @publishing_mutex = Mutex.new + @consumer_mutex = Mutex.new - @continuations = ::Queue.new - end + @unconfirmed_set_mutex = Mutex.new + @continuations = ::Queue.new + @confirms_continuations = ::Queue.new + @next_publish_seq_no = 0 + end + def open @connection.open_channel(self) @status = :open self @@ -144,10 +149,13 @@ def on_error(&block) @default_error_handler = block end + def using_publisher_confirmations? + @next_publish_seq_no > 0 + end # # Lower-level API, exposes protocol operations as they are defined in the protocol, # without any OO sugar on top, by design. # @@ -163,12 +171,25 @@ exchange end meta = { :priority => 0, :delivery_mode => 2, :content_type => "application/octet-stream" }. merge(opts) - @connection.send_frameset(AMQ::Protocol::Basic::Publish.encode(@id, payload, meta, exchange_name, routing_key, meta[:mandatory], false, @connection.frame_max), self) + if @next_publish_seq_no > 0 + @unconfirmed_set.add(@next_publish_seq_no) + @next_publish_seq_no += 1 + end + + @connection.send_frameset(AMQ::Protocol::Basic::Publish.encode(@id, + payload, + meta, + exchange_name, + routing_key, + meta[:mandatory], + false, + @connection.frame_max), self) + self end def basic_get(queue, opts = {:ack => true}) raise_if_no_longer_open! @@ -220,11 +241,14 @@ nil end def basic_nack(delivery_tag, requeue, multiple = false) raise_if_no_longer_open! - @connection.send_frame(AMQ::Protocol::Basic::Nack.encode(@id, delivery_tag, requeue, multiple)) + @connection.send_frame(AMQ::Protocol::Basic::Nack.encode(@id, + delivery_tag, + requeue, + multiple)) nil end def basic_consume(queue, consumer_tag = generate_consumer_tag, no_ack = false, exclusive = false, arguments = nil, &block) @@ -235,25 +259,58 @@ queue.name else queue end - @connection.send_frame(AMQ::Protocol::Basic::Consume.encode(@id, queue_name, consumer_tag, false, no_ack, exclusive, false, arguments)) + @connection.send_frame(AMQ::Protocol::Basic::Consume.encode(@id, + queue_name, + consumer_tag, + false, + no_ack, + exclusive, + false, + arguments)) Bunny::Timer.timeout(1, ClientTimeout) do @last_basic_consume_ok = @continuations.pop end @consumer_mutex.synchronize do - c = Consumer.new(self, queue, consumer_tag, no_ack, exclusive, arguments) + # make sure to use consumer tag from basic.consume-ok in case it was + # server-generated + c = Consumer.new(self, queue, @last_basic_consume_ok.consumer_tag, no_ack, exclusive, arguments) c.on_delivery(&block) if block - @consumers[@last_basic_consume_ok.consumer_tag] = c end @last_basic_consume_ok end + def basic_consume_with(consumer) + raise_if_no_longer_open! + maybe_start_consumer_work_pool! + + @connection.send_frame(AMQ::Protocol::Basic::Consume.encode(@id, + consumer.queue_name, + consumer.consumer_tag, + false, + consumer.no_ack, + consumer.exclusive, + false, + consumer.arguments)) + Bunny::Timer.timeout(1, ClientTimeout) do + @last_basic_consume_ok = @continuations.pop + end + + @consumer_mutex.synchronize do + # update the tag in case it was server-generated + consumer.consumer_tag = @last_basic_consume_ok.consumer_tag + @consumers[@last_basic_consume_ok.consumer_tag] = consumer + end + + @last_basic_consume_ok + end + def basic_cancel(consumer_tag) @connection.send_frame(AMQ::Protocol::Basic::Cancel.encode(@id, consumer_tag, false)) Bunny::Timer.timeout(1, ClientTimeout) do @last_basic_cancel_ok = @continuations.pop @@ -266,22 +323,33 @@ # queue.* def queue_declare(name, opts = {}) raise_if_no_longer_open! - @connection.send_frame(AMQ::Protocol::Queue::Declare.encode(@id, name, opts.fetch(:passive, false), opts.fetch(:durable, false), opts.fetch(:exclusive, false), opts.fetch(:auto_delete, false), false, opts[:arguments])) + @connection.send_frame(AMQ::Protocol::Queue::Declare.encode(@id, + name, + opts.fetch(:passive, false), + opts.fetch(:durable, false), + opts.fetch(:exclusive, false), + opts.fetch(:auto_delete, false), + false, + opts[:arguments])) @last_queue_declare_ok = @continuations.pop raise_if_continuation_resulted_in_a_channel_error! @last_queue_declare_ok end def queue_delete(name, opts = {}) raise_if_no_longer_open! - @connection.send_frame(AMQ::Protocol::Queue::Delete.encode(@id, name, opts[:if_unused], opts[:if_empty], false)) + @connection.send_frame(AMQ::Protocol::Queue::Delete.encode(@id, + name, + opts[:if_unused], + opts[:if_empty], + false)) Bunny::Timer.timeout(1, ClientTimeout) do @last_queue_delete_ok = @continuations.pop end raise_if_continuation_resulted_in_a_channel_error! @@ -308,11 +376,16 @@ exchange.name else exchange end - @connection.send_frame(AMQ::Protocol::Queue::Bind.encode(@id, name, exchange_name, opts[:routing_key], false, opts[:arguments])) + @connection.send_frame(AMQ::Protocol::Queue::Bind.encode(@id, + name, + exchange_name, + opts[:routing_key], + false, + opts[:arguments])) Bunny::Timer.timeout(1, ClientTimeout) do @last_queue_bind_ok = @continuations.pop end raise_if_continuation_resulted_in_a_channel_error! @@ -326,11 +399,15 @@ exchange.name else exchange end - @connection.send_frame(AMQ::Protocol::Queue::Unbind.encode(@id, name, exchange_name, opts[:routing_key], opts[:arguments])) + @connection.send_frame(AMQ::Protocol::Queue::Unbind.encode(@id, + name, + exchange_name, + opts[:routing_key], + opts[:arguments])) Bunny::Timer.timeout(1, ClientTimeout) do @last_queue_unbind_ok = @continuations.pop end raise_if_continuation_resulted_in_a_channel_error! @@ -341,11 +418,19 @@ # exchange.* def exchange_declare(name, type, opts = {}) raise_if_no_longer_open! - @connection.send_frame(AMQ::Protocol::Exchange::Declare.encode(@id, name, type.to_s, opts.fetch(:passive, false), opts.fetch(:durable, false), opts.fetch(:auto_delete, false), false, false, opts[:arguments])) + @connection.send_frame(AMQ::Protocol::Exchange::Declare.encode(@id, + name, + type.to_s, + opts.fetch(:passive, false), + opts.fetch(:durable, false), + opts.fetch(:auto_delete, false), + false, + false, + opts[:arguments])) Bunny::Timer.timeout(1, ClientTimeout) do @last_exchange_declare_ok = @continuations.pop end raise_if_continuation_resulted_in_a_channel_error! @@ -353,11 +438,14 @@ end def exchange_delete(name, opts = {}) raise_if_no_longer_open! - @connection.send_frame(AMQ::Protocol::Exchange::Delete.encode(@id, name, opts[:if_unused], false)) + @connection.send_frame(AMQ::Protocol::Exchange::Delete.encode(@id, + name, + opts[:if_unused], + false)) Bunny::Timer.timeout(1, ClientTimeout) do @last_exchange_delete_ok = @continuations.pop end raise_if_continuation_resulted_in_a_channel_error! @@ -366,22 +454,27 @@ def exchange_bind(source, destination, opts = {}) raise_if_no_longer_open! source_name = if source.respond_to?(:name) - source.name - else - source - end + source.name + else + source + end destination_name = if destination.respond_to?(:name) - destination.name - else - destination - end + destination.name + else + destination + end - @connection.send_frame(AMQ::Protocol::Exchange::Bind.encode(@id, destination_name, source_name, opts[:routing_key], false, opts[:arguments])) + @connection.send_frame(AMQ::Protocol::Exchange::Bind.encode(@id, + destination_name, + source_name, + opts[:routing_key], + false, + opts[:arguments])) Bunny::Timer.timeout(1, ClientTimeout) do @last_exchange_bind_ok = @continuations.pop end raise_if_continuation_resulted_in_a_channel_error! @@ -390,22 +483,27 @@ def exchange_unbind(source, destination, opts = {}) raise_if_no_longer_open! source_name = if source.respond_to?(:name) - source.name - else - source - end + source.name + else + source + end destination_name = if destination.respond_to?(:name) - destination.name - else - destination - end + destination.name + else + destination + end - @connection.send_frame(AMQ::Protocol::Exchange::Unbind.encode(@id, destination_name, source_name, opts[:routing_key], false, opts[:arguments])) + @connection.send_frame(AMQ::Protocol::Exchange::Unbind.encode(@id, + destination_name, + source_name, + opts[:routing_key], + false, + opts[:arguments])) Bunny::Timer.timeout(1, ClientTimeout) do @last_exchange_unbind_ok = @continuations.pop end raise_if_continuation_resulted_in_a_channel_error! @@ -467,20 +565,31 @@ # confirm.* def confirm_select raise_if_no_longer_open! + if @next_publish_seq_no == 0 + @confirms_continuations = [] + @unconfirmed_set = Set.new + @next_publish_seq_no = 1 + end + @connection.send_frame(AMQ::Protocol::Confirm::Select.encode(@id, false)) Bunny::Timer.timeout(1, ClientTimeout) do @last_confirm_select_ok = @continuations.pop end raise_if_continuation_resulted_in_a_channel_error! - @last_confirm_select_ok end + def wait_for_confirms + @only_acks_received = true + @confirms_continuations.pop + @only_acks_received + end + # # Implementation # def handle_method(method) @@ -510,19 +619,31 @@ @continuations.push(method) when AMQ::Protocol::Channel::FlowOk then @continuations.push(method) when AMQ::Protocol::Basic::ConsumeOk then @continuations.push(method) + when AMQ::Protocol::Basic::Cancel then + if consumer = @consumers[method.consumer_tag] + consumer.handle_cancellation(method) + end + + @consumers.delete(method.consumer_tag) when AMQ::Protocol::Basic::CancelOk then @continuations.push(method) @consumers.delete(method.consumer_tag) when AMQ::Protocol::Tx::SelectOk, AMQ::Protocol::Tx::CommitOk, AMQ::Protocol::Tx::RollbackOk then @continuations.push(method) when AMQ::Protocol::Tx::SelectOk then @continuations.push(method) when AMQ::Protocol::Confirm::SelectOk then @continuations.push(method) + when AMQ::Protocol::Basic::Ack then + # TODO: implement confirm listeners + handle_ack_or_nack(method.delivery_tag, method.multiple, false) + when AMQ::Protocol::Basic::Nack then + # TODO: implement confirm listeners + handle_ack_or_nack(method.delivery_tag, method.multiple, true) when AMQ::Protocol::Channel::Close then # puts "Exception on channel #{@id}: #{method.reply_code} #{method.reply_text}" closed! @connection.send_frame(AMQ::Protocol::Channel::CloseOk.encode(@id)) @@ -560,10 +681,24 @@ else # TODO: log a warning end end + def handle_ack_or_nack(delivery_tag, multiple, nack) + if multiple + @unconfirmed_set.delete_if { |i| i < delivery_tag } + else + @unconfirmed_set.delete(delivery_tag) + end + + @unconfirmed_set_mutex.synchronize do + @only_acks_received = (@only_acks_received && !nack) + + @confirms_continuations.push(true) if @unconfirmed_set.empty? + end + end + # Starts consumer work pool. Lazily called by #basic_consume to avoid creating new threads # that won't do any real work for channels that do not register consumers (e.g. only used for # publishing). MK. def maybe_start_consumer_work_pool! @work_pool.start unless @work_pool.started? @@ -574,10 +709,10 @@ end # Synchronizes given block using this channel's mutex. # @api public def synchronize(&block) - @mutex.synchronize(&block) + @publishing_mutex.synchronize(&block) end def register_queue(queue) @queues[queue.name] = queue end