lib/message_driver/adapters/bunny_adapter.rb in message-driver-0.6.1 vs lib/message_driver/adapters/bunny_adapter.rb in message-driver-0.7.0

- old
+ new

@@ -23,11 +23,11 @@ def initialize(ctx, delivery_info, properties, payload, destination) raw_body = payload raw_headers = properties[:headers] raw_headers = {} if raw_headers.nil? b, h, p = destination.middleware.on_consume(payload, raw_headers, properties) - super(ctx, b, h, p, raw_body) + super(ctx, destination, b, h, p, raw_body) @delivery_info = delivery_info end def delivery_tag delivery_info.delivery_tag @@ -57,35 +57,35 @@ class QueueDestination < Destination def after_initialize(adapter_context) if @dest_options[:no_declare] if @name.empty? - fail MessageDriver::Error, 'server-named queues must be declared, but you provided :no_declare => true' + raise MessageDriver::Error, 'server-named queues must be declared, but you provided :no_declare => true' end if @dest_options[:bindings] - fail MessageDriver::Error, 'queues with bindings must be declared, but you provided :no_declare => true' + raise MessageDriver::Error, 'queues with bindings must be declared, but you provided :no_declare => true' end else adapter_context.with_channel(false) do |ch| bunny_queue(ch, init: true) end end end def bunny_queue(channel, options = {}) opts = @dest_options.dup - opts.merge!(passive: options[:passive]) if options.key? :passive + opts[:passive] = options[:passive] if options.key? :passive queue = channel.queue(@name, opts) handle_queue_init(queue) if options.fetch(:init, false) queue end def handle_queue_init(queue) @name = queue.name if (bindings = @dest_options[:bindings]) bindings.each do |bnd| - fail MessageDriver::Error, "binding #{bnd.inspect} must provide a source!" unless bnd[:source] + raise MessageDriver::Error, "binding #{bnd.inspect} must provide a source!" unless bnd[:source] queue.bind(bnd[:source], bnd[:args] || {}) end end end @@ -95,46 +95,46 @@ def routing_key(_properties) @name end - def message_count - adapter.broker.client.current_adapter_context.with_channel(false) do |ch| + def handle_message_count + current_adapter_context.with_channel(false) do |ch| bunny_queue(ch, passive: true).message_count end end def subscribe(options = {}, &consumer) - adapter.broker.client.current_adapter_context.subscribe(self, options, &consumer) + current_adapter_context.subscribe(self, options, &consumer) end - def consumer_count - adapter.broker.client.current_adapter_context.with_channel(false) do |ch| + def handle_consumer_count + current_adapter_context.with_channel(false) do |ch| bunny_queue(ch, passive: true).consumer_count end end def purge - adapter.broker.client.current_adapter_context.with_channel(false) do |ch| + current_adapter_context.with_channel(false) do |ch| bunny_queue(ch).purge end end end class ExchangeDestination < Destination def after_initialize(adapter_context) if (declare = @dest_options[:declare]) adapter_context.with_channel(false) do |ch| type = declare.delete(:type) - fail MessageDriver::Error, 'you must provide a valid exchange type' unless type + raise MessageDriver::Error, 'you must provide a valid exchange type' unless type ch.exchange_declare(@name, type, declare) end end if (bindings = @dest_options[:bindings]) adapter_context.with_channel(false) do |ch| bindings.each do |bnd| - fail MessageDriver::Error, "binding #{bnd.inspect} must provide a source!" unless bnd[:source] + raise MessageDriver::Error, "binding #{bnd.inspect} must provide a source!" unless bnd[:source] ch.exchange_bind(bnd[:source], @name, bnd[:args] || {}) end end end end @@ -143,12 +143,12 @@ class Subscription < Subscription::Base attr_reader :sub_ctx, :error_handler def start unless destination.is_a? QueueDestination - fail MessageDriver::Error, - 'subscriptions are only supported with QueueDestinations' + raise MessageDriver::Error, + 'subscriptions are only supported with QueueDestinations' end @sub_ctx = adapter.new_subscription_context(self) @error_handler = options[:error_handler] @message_handler = case options.delete(:ack) when :auto, nil @@ -156,11 +156,11 @@ when :manual ManualAckHandler.new(self) when :transactional TransactionalAckHandler.new(self) else - fail MessageDriver::Error, "unrecognized :ack option #{options[:ack]}" + raise MessageDriver::Error, "unrecognized :ack option #{options[:ack]}" end start_subscription end def unsubscribe @@ -303,54 +303,50 @@ @need_channel_reset = false @in_transaction = false @require_commit = false end - def create_destination(name, dest_options = {}, message_props = {}) + def handle_create_destination(name, dest_options = {}, message_props = {}) dest = case type = dest_options.delete(:type) when :exchange ExchangeDestination.new(adapter, name, dest_options, message_props) when :queue, nil QueueDestination.new(adapter, name, dest_options, message_props) else - fail MessageDriver::Error, "invalid destination type #{type}" + raise MessageDriver::Error, "invalid destination type #{type}" end dest.after_initialize(self) dest end def supports_transactions? true end - def begin_transaction(options = {}) + def handle_begin_transaction(options = {}) if in_transaction? - fail MessageDriver::TransactionError, - "you can't begin another transaction, you are already in one!" + raise MessageDriver::TransactionError, + "you can't begin another transaction, you are already in one!" end @in_transaction = true @in_confirms_transaction = true if options[:type] == :confirm_and_wait end - def commit_transaction + def handle_commit_transaction(_ = nil) if !in_transaction? && !@require_commit - fail MessageDriver::TransactionError, - "you can't finish the transaction unless you already in one!" + raise MessageDriver::TransactionError, + "you can't finish the transaction unless you already in one!" end begin if @in_confirms_transaction - unless @rollback_only || @channel.nil? - @channel.wait_for_confirms - end - else - if is_transactional? && valid? && !@need_channel_reset && @require_commit - handle_errors do - if @rollback_only - @channel.tx_rollback - else - @channel.tx_commit - end + @channel.wait_for_confirms unless @rollback_only || @channel.nil? + elsif is_transactional? && valid? && !@need_channel_reset && @require_commit + handle_errors do + if @rollback_only + @channel.tx_rollback + else + @channel.tx_commit end end end ensure @rollback_only = false @@ -358,25 +354,25 @@ @in_confirms_transaction = false @require_commit = false end end - def rollback_transaction + def handle_rollback_transaction(_ = nil) @rollback_only = true commit_transaction end def transactional? @is_transactional end - alias_method :is_transactional?, :transactional? + alias is_transactional? transactional? def in_transaction? @in_transaction end - def publish(destination, body, headers = {}, properties = {}) + def handle_publish(destination, body, headers = {}, properties = {}) body, exchange, routing_key, props = *destination.publish_params(body, headers, properties) confirm = props.delete(:confirm) confirm = false if confirm.nil? with_channel(true) do |ch| if confirm == true @@ -385,12 +381,12 @@ ch.basic_publish(body, exchange, routing_key, props) ch.wait_for_confirms if confirm == true end end - def pop_message(destination, options = {}) - fail MessageDriver::Error, "You can't pop a message off an exchange" if destination.is_a? ExchangeDestination + def handle_pop_message(destination, options = {}) + raise MessageDriver::Error, "You can't pop a message off an exchange" if destination.is_a? ExchangeDestination with_channel(false) do |ch| queue = ch.queue(destination.name, passive: true) message = queue.pop(adapter.ack_key => options.fetch(:client_ack, false)) @@ -404,33 +400,49 @@ def supports_client_acks? true end - def ack_message(message, _options = {}) + def handle_ack_message(message, _options = {}) with_channel(true) do |ch| ch.ack(message.delivery_tag) end end - def nack_message(message, options = {}) + def handle_nack_message(message, options = {}) requeue = options.fetch(:requeue, true) with_channel(true) do |ch| ch.reject(message.delivery_tag, requeue) end end def supports_subscriptions? true end - def subscribe(destination, options = {}, &consumer) + def handle_subscribe(destination, options = {}, &consumer) sub = Subscription.new(adapter, destination, consumer, options) sub.start sub end + def handle_message_count(destination) + if destination.respond_to?(:handle_message_count) + destination.handle_message_count + else + super + end + end + + def handle_consumer_count(destination) + if destination.respond_to?(:handle_consumer_count) + destination.handle_consumer_count + else + super + end + end + def invalidate(in_unsubscribe = false) super() unless @subscription.nil? || in_unsubscribe begin @subscription.unsubscribe if adapter.connection.open? @@ -486,12 +498,12 @@ @is_transactional = true end end def with_channel(require_commit = true) - fail MessageDriver::TransactionRollbackOnly if @rollback_only - fail MessageDriver::Error, 'this adapter context is not valid!' unless valid? + raise MessageDriver::TransactionRollbackOnly if @rollback_only + raise MessageDriver::Error, 'this adapter context is not valid!' unless valid? ensure_channel @require_commit ||= require_commit if in_transaction? if @in_confirms_transaction @channel.confirm_select unless @channel.using_publisher_confirmations? @@ -533,10 +545,10 @@ def validate_bunny_version required = Gem::Requirement.create('>= 1.7.0') current = Gem::Version.create(Bunny::VERSION) unless required.satisfied_by? current - fail MessageDriver::Error, 'bunny 1.7.0 or later is required for the bunny adapter' + raise MessageDriver::Error, 'bunny 1.7.0 or later is required for the bunny adapter' end end end end end