lib/bunny/channel.rb in bunny-0.8.0 vs lib/bunny/channel.rb in bunny-0.9.0.pre1
- old
+ new
@@ -1,39 +1,635 @@
-# encoding: utf-8
+require "thread"
+require "amq/int_allocator"
+require "bunny/consumer_work_pool"
+
+require "bunny/exchange"
+require "bunny/queue"
+require "bunny/message_metadata"
+
module Bunny
- class Channel < Qrack::Channel
+ class Channel
- def initialize(client)
- super
+ #
+ # API
+ #
+
+ attr_accessor :id, :connection, :status, :work_pool
+
+
+ def initialize(connection = nil, id = nil, work_pool = ConsumerWorkPool.new(1))
+ @connection = connection
+ @id = id || @connection.next_channel_id
+ @status = :opening
+
+ @connection.register_channel(self)
+
+ @queues = Hash.new
+ @exchanges = Hash.new
+ @consumers = Hash.new
+ @work_pool = work_pool
+
+ # synchronizes frameset delivery. MK.
+ @mutex = Mutex.new
+ @consumer_mutex = Mutex.new
+
+ @continuations = ::Queue.new
end
+
def open
- client.channel = self
- client.send_frame(Qrack::Protocol::Channel::Open.new)
+ @connection.open_channel(self)
+ @status = :open
- method = client.next_method
+ self
+ end
- client.check_response(method, Qrack::Protocol::Channel::OpenOk, "Cannot open channel #{number}")
+ def close
+ @connection.close_channel(self)
+ closed!
+ end
- @active = true
- :open_ok
+ def open?
+ @status == :open
end
- def close
- client.channel = self
- client.send_frame(Qrack::Protocol::Channel::Close.new(:reply_code => 200, :reply_text => 'bye', :method_id => 0, :class_id => 0))
+ def closed?
+ @status == :closed
+ end
- method = client.next_method
+ def queue(name = AMQ::Protocol::EMPTY_STRING, opts = {})
+ q = find_queue(name) || Bunny::Queue.new(self, name, opts)
- client.check_response(method, Qrack::Protocol::Channel::CloseOk, "Error closing channel #{number}")
+ register_queue(q)
+ end
- @active = false
- :close_ok
+
+ #
+ # Backwards compatibility with 0.8.0
+ #
+
+ def number
+ self.id
end
- def open?
- active
+ def active
+ @active
end
+ def client
+ @connection
+ end
+
+ def frame_size
+ @connection.frame_max
+ end
+
+
+ #
+ # Higher-level API, similar to amqp gem
+ #
+
+ def fanout(name, opts = {})
+ Exchange.new(self, :fanout, name, opts)
+ end
+
+ def direct(name, opts = {})
+ Exchange.new(self, :direct, name, opts)
+ end
+
+ def topic(name, opts = {})
+ Exchange.new(self, :topic, name, opts)
+ end
+
+ def headers(name, opts = {})
+ Exchange.new(self, :headers, name, opts)
+ end
+
+ def default_exchange
+ self.direct("", :no_declare => true)
+ end
+
+ def prefetch(prefetch_count)
+ self.basic_qos(prefetch_count, false)
+ end
+
+ def flow(active)
+ channel_flow(active)
+ end
+
+ def recover(ignored = true)
+ # RabbitMQ only supports basic.recover with requeue = true
+ basic_recover(true)
+ end
+
+ def reject(delivery_tag, requeue = false)
+ basic_reject(delivery_tag, requeue)
+ end
+
+ def ack(delivery_tag, multiple)
+ basic_ack(delivery_tag, multiple)
+ end
+ alias acknowledge ack
+
+ def nack(delivery_tag, requeue, multiple = false)
+ basic_nack(delivery_tag, requeue, multiple)
+ end
+
+ def on_error(&block)
+ @default_error_handler = block
+ end
+
+
+ #
+ # Lower-level API, exposes protocol operations as they are defined in the protocol,
+ # without any OO sugar on top, by design.
+ #
+
+ # basic.*
+
+ def basic_publish(payload, exchange, routing_key, opts = {})
+ raise_if_no_longer_open!
+
+ exchange_name = if exchange.respond_to?(:name)
+ exchange.name
+ else
+ exchange
+ end
+
+ meta = { :priority => 0, :delivery_mode => 2, :content_type => "application/octet-stream" }.
+ merge(opts)
+ @connection.send_frameset(AMQ::Protocol::Basic::Publish.encode(@id, payload, meta, exchange_name, routing_key, meta[:mandatory], false, (frame_size || @connection.frame_max)), self)
+
+ self
+ end
+
+ def basic_get(queue, opts = {:ack => true})
+ raise_if_no_longer_open!
+
+ @connection.send_frame(AMQ::Protocol::Basic::Get.encode(@id, queue, !opts[:ack]))
+ @last_basic_get_response = @continuations.pop
+
+ raise_if_continuation_resulted_in_a_channel_error!
+ @last_basic_get_response
+ end
+
+ def basic_qos(prefetch_count, global = false)
+ raise ArgumentError.new("prefetch count must be a positive integer, given: #{prefetch_count}") if prefetch_count < 0
+ raise_if_no_longer_open!
+
+ @connection.send_frame(AMQ::Protocol::Basic::Qos.encode(@id, 0, prefetch_count, global))
+
+ Bunny::Timer.timeout(1, ClientTimeout) do
+ @last_basic_qos_ok = @continuations.pop
+ end
+ raise_if_continuation_resulted_in_a_channel_error!
+
+ @last_basic_qos_ok
+ end
+
+ def basic_recover(requeue)
+ raise_if_no_longer_open!
+
+ @connection.send_frame(AMQ::Protocol::Basic::Recover.encode(@id, requeue))
+ Bunny::Timer.timeout(1, ClientTimeout) do
+ @last_basic_recover_ok = @continuations.pop
+ end
+ raise_if_continuation_resulted_in_a_channel_error!
+
+ @last_basic_recover_ok
+ end
+
+ def basic_reject(delivery_tag, requeue)
+ raise_if_no_longer_open!
+ @connection.send_frame(AMQ::Protocol::Basic::Reject.encode(@id, delivery_tag, requeue))
+
+ nil
+ end
+
+ def basic_ack(delivery_tag, multiple)
+ raise_if_no_longer_open!
+ @connection.send_frame(AMQ::Protocol::Basic::Ack.encode(@id, delivery_tag, multiple))
+
+ nil
+ end
+
+ def basic_nack(delivery_tag, requeue, multiple = false)
+ raise_if_no_longer_open!
+ @connection.send_frame(AMQ::Protocol::Basic::Nack.encode(@id, delivery_tag, requeue, multiple))
+
+ nil
+ end
+
+ def basic_consume(queue, consumer_tag = generate_consumer_tag, no_ack = false, exclusive = false, arguments = nil, &block)
+ raise_if_no_longer_open!
+ maybe_start_consumer_work_pool!
+
+ queue_name = if queue.respond_to?(:name)
+ queue.name
+ else
+ queue
+ end
+
+ @connection.send_frame(AMQ::Protocol::Basic::Consume.encode(@id, queue_name, consumer_tag, false, no_ack, exclusive, false, arguments))
+ Bunny::Timer.timeout(1, ClientTimeout) do
+ @last_basic_consume_ok = @continuations.pop
+ end
+
+ @consumer_mutex.synchronize do
+ c = Consumer.new(self, queue, consumer_tag, no_ack, exclusive, arguments)
+ c.on_delivery(&block) if block
+
+ @consumers[@last_basic_consume_ok.consumer_tag] = c
+ end
+
+ @last_basic_consume_ok
+ end
+
+ def basic_cancel(consumer_tag)
+ @connection.send_frame(AMQ::Protocol::Basic::Cancel.encode(@id, consumer_tag, false))
+
+ Bunny::Timer.timeout(1, ClientTimeout) do
+ @last_basic_cancel_ok = @continuations.pop
+ end
+
+ @last_basic_cancel_ok
+ end
+
+
+ # queue.*
+
+ def queue_declare(name, opts = {})
+ raise_if_no_longer_open!
+
+ @connection.send_frame(AMQ::Protocol::Queue::Declare.encode(@id, name, opts.fetch(:passive, false), opts.fetch(:durable, false), opts.fetch(:exclusive, false), opts.fetch(:auto_delete, false), false, opts[:arguments]))
+ @last_queue_declare_ok = @continuations.pop
+
+ raise_if_continuation_resulted_in_a_channel_error!
+
+ @last_queue_declare_ok
+ end
+
+ def queue_delete(name, opts = {})
+ raise_if_no_longer_open!
+
+ @connection.send_frame(AMQ::Protocol::Queue::Delete.encode(@id, name, opts[:if_unused], opts[:if_empty], false))
+ Bunny::Timer.timeout(1, ClientTimeout) do
+ @last_queue_delete_ok = @continuations.pop
+ end
+ raise_if_continuation_resulted_in_a_channel_error!
+
+ @last_queue_delete_ok
+ end
+
+ def queue_purge(name, opts = {})
+ raise_if_no_longer_open!
+
+ @connection.send_frame(AMQ::Protocol::Queue::Purge.encode(@id, name, false))
+
+ Bunny::Timer.timeout(1, ClientTimeout) do
+ @last_queue_purge_ok = @continuations.pop
+ end
+ raise_if_continuation_resulted_in_a_channel_error!
+
+ @last_queue_purge_ok
+ end
+
+ def queue_bind(name, exchange, opts = {})
+ raise_if_no_longer_open!
+
+ exchange_name = if exchange.respond_to?(:name)
+ exchange.name
+ else
+ exchange
+ end
+
+ @connection.send_frame(AMQ::Protocol::Queue::Bind.encode(@id, name, exchange_name, opts[:routing_key], false, opts[:arguments]))
+ Bunny::Timer.timeout(1, ClientTimeout) do
+ @last_queue_bind_ok = @continuations.pop
+ end
+
+ raise_if_continuation_resulted_in_a_channel_error!
+ @last_queue_bind_ok
+ end
+
+ def queue_unbind(name, exchange, opts = {})
+ raise_if_no_longer_open!
+
+ exchange_name = if exchange.respond_to?(:name)
+ exchange.name
+ else
+ exchange
+ end
+
+ @connection.send_frame(AMQ::Protocol::Queue::Unbind.encode(@id, name, exchange_name, opts[:routing_key], opts[:arguments]))
+ Bunny::Timer.timeout(1, ClientTimeout) do
+ @last_queue_unbind_ok = @continuations.pop
+ end
+
+ raise_if_continuation_resulted_in_a_channel_error!
+ @last_queue_unbind_ok
+ end
+
+
+ # exchange.*
+
+ def exchange_declare(name, type, opts = {})
+ raise_if_no_longer_open!
+
+ @connection.send_frame(AMQ::Protocol::Exchange::Declare.encode(@id, name, type.to_s, opts.fetch(:passive, false), opts.fetch(:durable, false), opts.fetch(:auto_delete, false), false, false, opts[:arguments]))
+ Bunny::Timer.timeout(1, ClientTimeout) do
+ @last_exchange_declare_ok = @continuations.pop
+ end
+
+ raise_if_continuation_resulted_in_a_channel_error!
+ @last_exchange_declare_ok
+ end
+
+ def exchange_delete(name, opts = {})
+ raise_if_no_longer_open!
+
+ @connection.send_frame(AMQ::Protocol::Exchange::Delete.encode(@id, name, opts[:if_unused], false))
+ Bunny::Timer.timeout(1, ClientTimeout) do
+ @last_exchange_delete_ok = @continuations.pop
+ end
+
+ raise_if_continuation_resulted_in_a_channel_error!
+ @last_exchange_delete_ok
+ end
+
+ def exchange_bind(source, destination, opts = {})
+ raise_if_no_longer_open!
+
+ source_name = if source.respond_to?(:name)
+ source.name
+ else
+ source
+ end
+
+ destination_name = if destination.respond_to?(:name)
+ destination.name
+ else
+ destination
+ end
+
+ @connection.send_frame(AMQ::Protocol::Exchange::Bind.encode(@id, destination_name, source_name, opts[:routing_key], false, opts[:arguments]))
+ Bunny::Timer.timeout(1, ClientTimeout) do
+ @last_exchange_bind_ok = @continuations.pop
+ end
+
+ raise_if_continuation_resulted_in_a_channel_error!
+ @last_exchange_bind_ok
+ end
+
+ def exchange_unbind(source, destination, opts = {})
+ raise_if_no_longer_open!
+
+ source_name = if source.respond_to?(:name)
+ source.name
+ else
+ source
+ end
+
+ destination_name = if destination.respond_to?(:name)
+ destination.name
+ else
+ destination
+ end
+
+ @connection.send_frame(AMQ::Protocol::Exchange::Unbind.encode(@id, destination_name, source_name, opts[:routing_key], false, opts[:arguments]))
+ Bunny::Timer.timeout(1, ClientTimeout) do
+ @last_exchange_unbind_ok = @continuations.pop
+ end
+
+ raise_if_continuation_resulted_in_a_channel_error!
+ @last_exchange_unbind_ok
+ end
+
+ # channel.*
+
+ def channel_flow(active)
+ raise_if_no_longer_open!
+
+ @connection.send_frame(AMQ::Protocol::Channel::Flow.encode(@id, active))
+ Bunny::Timer.timeout(1, ClientTimeout) do
+ @last_channel_flow_ok = @continuations.pop
+ end
+ raise_if_continuation_resulted_in_a_channel_error!
+
+ @last_channel_flow_ok
+ end
+
+ # tx.*
+
+ def tx_select
+ raise_if_no_longer_open!
+
+ @connection.send_frame(AMQ::Protocol::Tx::Select.encode(@id))
+ Bunny::Timer.timeout(1, ClientTimeout) do
+ @last_tx_select_ok = @continuations.pop
+ end
+ raise_if_continuation_resulted_in_a_channel_error!
+
+ @last_tx_select_ok
+ end
+
+ def tx_commit
+ raise_if_no_longer_open!
+
+ @connection.send_frame(AMQ::Protocol::Tx::Commit.encode(@id))
+ Bunny::Timer.timeout(1, ClientTimeout) do
+ @last_tx_commit_ok = @continuations.pop
+ end
+ raise_if_continuation_resulted_in_a_channel_error!
+
+ @last_tx_commit_ok
+ end
+
+ def tx_rollback
+ raise_if_no_longer_open!
+
+ @connection.send_frame(AMQ::Protocol::Tx::Rollback.encode(@id))
+ Bunny::Timer.timeout(1, ClientTimeout) do
+ @last_tx_rollback_ok = @continuations.pop
+ end
+ raise_if_continuation_resulted_in_a_channel_error!
+
+ @last_tx_rollback_ok
+ end
+
+ # confirm.*
+
+ def confirm_select
+ raise_if_no_longer_open!
+
+ @connection.send_frame(AMQ::Protocol::Confirm::Select.encode(@id, false))
+ Bunny::Timer.timeout(1, ClientTimeout) do
+ @last_confirm_select_ok = @continuations.pop
+ end
+ raise_if_continuation_resulted_in_a_channel_error!
+
+ @last_confirm_select_ok
+ end
+
+
+ #
+ # Implementation
+ #
+
+ def handle_method(method)
+ # puts "Channel#handle_frame on channel #{@id}: #{method.inspect}"
+ case method
+ when AMQ::Protocol::Queue::DeclareOk then
+ @continuations.push(method)
+ when AMQ::Protocol::Queue::DeleteOk then
+ @continuations.push(method)
+ when AMQ::Protocol::Queue::PurgeOk then
+ @continuations.push(method)
+ when AMQ::Protocol::Queue::BindOk then
+ @continuations.push(method)
+ when AMQ::Protocol::Queue::UnbindOk then
+ @continuations.push(method)
+ when AMQ::Protocol::Exchange::BindOk then
+ @continuations.push(method)
+ when AMQ::Protocol::Exchange::UnbindOk then
+ @continuations.push(method)
+ when AMQ::Protocol::Exchange::DeclareOk then
+ @continuations.push(method)
+ when AMQ::Protocol::Exchange::DeleteOk then
+ @continuations.push(method)
+ when AMQ::Protocol::Basic::QosOk then
+ @continuations.push(method)
+ when AMQ::Protocol::Basic::RecoverOk then
+ @continuations.push(method)
+ when AMQ::Protocol::Channel::FlowOk then
+ @continuations.push(method)
+ when AMQ::Protocol::Basic::ConsumeOk then
+ @continuations.push(method)
+ when AMQ::Protocol::Basic::CancelOk then
+ @continuations.push(method)
+ @consumers.delete(method.consumer_tag)
+ when AMQ::Protocol::Tx::SelectOk, AMQ::Protocol::Tx::CommitOk, AMQ::Protocol::Tx::RollbackOk then
+ @continuations.push(method)
+ when AMQ::Protocol::Tx::SelectOk then
+ @continuations.push(method)
+ when AMQ::Protocol::Confirm::SelectOk then
+ @continuations.push(method)
+ when AMQ::Protocol::Channel::Close then
+ # puts "Exception on channel #{@id}: #{method.reply_code} #{method.reply_text}"
+ closed!
+ @connection.send_frame(AMQ::Protocol::Channel::CloseOk.encode(@id))
+
+ @last_channel_error = instantiate_channel_level_exception(method)
+ @continuations.push(method)
+ when AMQ::Protocol::Channel::CloseOk then
+ @continuations.push(method)
+ else
+ raise "Do not know how to handle #{method.inspect} in Bunny::Channel#handle_method"
+ end
+ end
+
+ def handle_basic_get_ok(basic_get_ok, properties, content)
+ @continuations.push([basic_get_ok, properties, content])
+ end
+
+ def handle_basic_get_empty(basic_get_empty)
+ @continuations.push([nil, nil, nil])
+ end
+
+ def handle_frameset(basic_deliver, properties, content)
+ consumer = @consumers[basic_deliver.consumer_tag]
+ if consumer
+ @work_pool.submit do
+ consumer.call(MessageMetadata.new(basic_deliver, properties), content)
+ end
+ end
+ end
+
+ def handle_basic_return(basic_return, properties, content)
+ x = find_exchange(basic_return.exchange)
+
+ if x
+ x.handle_return(basic_return, properties, content)
+ else
+ # TODO: log a warning
+ end
+ end
+
+ # Starts consumer work pool. Lazily called by #basic_consume to avoid creating new threads
+ # that won't do any real work for channels that do not register consumers (e.g. only used for
+ # publishing). MK.
+ def maybe_start_consumer_work_pool!
+ @work_pool.start unless @work_pool.started?
+ end
+
+ def read_next_frame(options = {})
+ @connection.read_next_frame(options = {})
+ end
+
+ # Synchronizes given block using this channel's mutex.
+ # @api public
+ def synchronize(&block)
+ @mutex.synchronize(&block)
+ end
+
+ def register_queue(queue)
+ @queues[queue.name] = queue
+ end
+
+ def find_queue(name)
+ @queues[name]
+ end
+
+ def register_exchange(exchange)
+ @exchanges[exchange.name] = exchange
+ end
+
+ def find_exchange(name)
+ @exchanges[name]
+ end
+
+ protected
+
+ def closed!
+ @status = :closed
+ @work_pool.shutdown
+ @connection.release_channel_id(@id)
+ end
+
+ def instantiate_channel_level_exception(frame)
+ case frame
+ when AMQ::Protocol::Channel::Close then
+ klass = case frame.reply_code
+ when 403 then
+ AccessRefused
+ when 404 then
+ NotFound
+ when 405 then
+ ResourceLocked
+ when 406 then
+ PreconditionFailed
+ else
+ ChannelLevelException
+ end
+
+ klass.new(frame.reply_text, self, frame)
+ end
+ end
+
+ def raise_if_continuation_resulted_in_a_channel_error!
+ raise @last_channel_error if @last_channel_error
+ end
+
+ def raise_if_no_longer_open!
+ raise ChannelAlreadyClosed.new("cannot use a channel that was already closed! Channel id: #{@id}", self) if closed?
+ end
+
+ # Unique string supposed to be used as a consumer tag.
+ #
+ # @return [String] Unique string.
+ # @api plugin
+ def generate_consumer_tag(name = "bunny")
+ "#{name}-#{Time.now.to_i * 1000}-#{Kernel.rand(999_999_999_999)}"
+ end
end
end