lib/message_driver/adapters/bunny_adapter.rb in message-driver-0.4.0 vs lib/message_driver/adapters/bunny_adapter.rb in message-driver-0.5.0

- old
+ new

@@ -1,23 +1,33 @@ require 'bunny' +require 'forwardable' module MessageDriver class Broker def bunny_adapter MessageDriver::Adapters::BunnyAdapter end end module Adapters class BunnyAdapter < Base - NETWORK_ERRORS = [Bunny::TCPConnectionFailed, Bunny::ConnectionClosedError, Bunny::ConnectionLevelException, Bunny::NetworkErrorWrapper, Bunny::NetworkFailure, IOError].freeze + NETWORK_ERRORS = [Bunny::TCPConnectionFailed, + Bunny::ConnectionClosedError, + Bunny::ConnectionLevelException, + Bunny::NetworkErrorWrapper, + Bunny::NetworkFailure, + IOError].freeze class Message < MessageDriver::Message::Base attr_reader :delivery_info - def initialize(ctx, delivery_info, properties, payload) - super(ctx, payload, properties[:headers]||{}, properties) + def initialize(ctx, delivery_info, properties, payload, destination) + raw_body = payload + raw_headers = properties[:headers] + raw_headers = {} if raw_headers.nil? + b, h, p = destination.middleware.on_consume(payload, raw_headers, properties) + super(ctx, b, h, p, raw_body) @delivery_info = delivery_info end def delivery_tag delivery_info.delivery_tag @@ -27,14 +37,15 @@ delivery_info.redelivered? end end class Destination < MessageDriver::Destination::Base - def publish_params(headers, properties) - props = @message_props.merge(properties) - props[:headers] = headers if headers - [exchange_name, routing_key(properties), props] + def publish_params(body, headers, properties) + b, h, p = middleware.on_publish(body, headers, properties) + props = @message_props.merge(p) + props[:headers] = h if h + [b, exchange_name, routing_key(properties), props] end def exchange_name @name end @@ -45,31 +56,39 @@ end class QueueDestination < Destination def after_initialize(adapter_context) if @dest_options[:no_declare] - raise MessageDriver::Error, 'server-named queues must be declared, but you provided :no_declare => true' if @name.empty? - raise MessageDriver::Error, 'queues with bindings must be declared, but you provided :no_declare => true' if @dest_options[:bindings] + if @name.empty? + fail MessageDriver::Error, 'server-named queues must be declared, but you provided :no_declare => true' + end + if @dest_options[:bindings] + fail MessageDriver::Error, 'queues with bindings must be declared, but you provided :no_declare => true' + end else adapter_context.with_channel(false) do |ch| - bunny_queue(ch, true) + bunny_queue(ch, init: true) end end end - def bunny_queue(channel, initialize=false) - queue = channel.queue(@name, @dest_options) - if initialize - @name = queue.name - if (bindings = @dest_options[:bindings]) - bindings.each do |bnd| - raise MessageDriver::Error, "binding #{bnd.inspect} must provide a source!" unless bnd[:source] - queue.bind(bnd[:source], bnd[:args]||{}) - end + def bunny_queue(channel, options = {}) + opts = @dest_options.dup + opts.merge!(passive: options[:passive]) if options.key? :passive + queue = channel.queue(@name, opts) + handle_queue_init(queue) if options.fetch(:init, false) + queue + end + + def handle_queue_init(queue) + @name = queue.name + if (bindings = @dest_options[:bindings]) + bindings.each do |bnd| + fail MessageDriver::Error, "binding #{bnd.inspect} must provide a source!" unless bnd[:source] + queue.bind(bnd[:source], bnd[:args] || {}) end end - queue end def exchange_name '' end @@ -78,14 +97,24 @@ @name end def message_count adapter.broker.client.current_adapter_context.with_channel(false) do |ch| - ch.queue(@name, @dest_options.merge(passive: true)).message_count + bunny_queue(ch, passive: true).message_count end end + def subscribe(options = {}, &consumer) + adapter.broker.client.current_adapter_context.subscribe(self, options, &consumer) + end + + def consumer_count + adapter.broker.client.current_adapter_context.with_channel(false) do |ch| + bunny_queue(ch, passive: true).consumer_count + end + end + def purge adapter.broker.client.current_adapter_context.with_channel(false) do |ch| bunny_queue(ch).purge end end @@ -94,40 +123,45 @@ class ExchangeDestination < Destination def after_initialize(adapter_context) if (declare = @dest_options[:declare]) adapter_context.with_channel(false) do |ch| type = declare.delete(:type) - raise MessageDriver::Error, 'you must provide a valid exchange type' unless type + fail MessageDriver::Error, 'you must provide a valid exchange type' unless type ch.exchange_declare(@name, type, declare) end end if (bindings = @dest_options[:bindings]) adapter_context.with_channel(false) do |ch| bindings.each do |bnd| - raise MessageDriver::Error, "binding #{bnd.inspect} must provide a source!" unless bnd[:source] - ch.exchange_bind(bnd[:source], @name, bnd[:args]||{}) + fail MessageDriver::Error, "binding #{bnd.inspect} must provide a source!" unless bnd[:source] + ch.exchange_bind(bnd[:source], @name, bnd[:args] || {}) end end end end end class Subscription < Subscription::Base + attr_reader :sub_ctx, :error_handler + def start - raise MessageDriver::Error, 'subscriptions are only supported with QueueDestinations' unless destination.is_a? QueueDestination + unless destination.is_a? QueueDestination + fail MessageDriver::Error, + 'subscriptions are only supported with QueueDestinations' + end @sub_ctx = adapter.new_subscription_context(self) @error_handler = options[:error_handler] - @ack_mode = case options[:ack] - when :auto, nil - :auto - when :manual - :manual - when :transactional - :transactional - else - raise MessageDriver::Error, "unrecognized :ack option #{options[:ack]}" - end + @message_handler = case options[:ack] + when :auto, nil + AutoAckHandler.new(self) + when :manual + ManualAckHandler.new(self) + when :transactional + TransactionalAckHandler.new(self) + else + fail MessageDriver::Error, "unrecognized :ack option #{options[:ack]}" + end start_subscription end def unsubscribe unless @bunny_consumer.nil? @@ -140,65 +174,89 @@ end end private - def start_subscription - @sub_ctx.with_channel do |ch| - queue = destination.bunny_queue(@sub_ctx.channel) - if options.key? :prefetch_size - ch.prefetch(options[:prefetch_size]) + class MessageHandler + extend Forwardable + include Logging + + attr_accessor :subscription + def_delegators :subscription, :adapter, :sub_ctx, :consumer, :error_handler, :options + + def initialize(subscription) + @subscription = subscription + end + + def call(message) + consumer.call(message) + rescue => e + error_handler.call(e, message) unless error_handler.nil? + end + + def nack_message(e, message) + requeue = true + if e.is_a?(DontRequeue) || (options[:retry_redelivered] == false && message.redelivered?) + requeue = false end - @bunny_consumer = queue.subscribe(options.merge(manual_ack: true)) do |delivery_info, properties, payload| - adapter.broker.client.with_adapter_context(@sub_ctx) do - message = @sub_ctx.args_to_message(delivery_info, properties, payload) - handle_message(message) + if sub_ctx.valid? + begin + sub_ctx.nack_message(message, requeue: requeue) + rescue => e + logger.error exception_to_str(e) end end end end - def handle_message(message) - begin - case @ack_mode - when :auto + class ManualAckHandler < MessageHandler + # all functionality implemented in super class + end + + class AutoAckHandler < MessageHandler + def call(message) + consumer.call(message) + sub_ctx.ack_message(message) + rescue => e + nack_message(e, message) + error_handler.call(e, message) unless error_handler.nil? + end + end + + class TransactionalAckHandler < MessageHandler + def call(message) + adapter.broker.client.with_message_transaction do consumer.call(message) - @sub_ctx.ack_message(message) - when :manual - consumer.call(message) - when :transactional - adapter.broker.client.with_message_transaction do - consumer.call(message) - @sub_ctx.ack_message(message) - end + sub_ctx.ack_message(message) end rescue => e - if [:auto, :transactional].include? @ack_mode - requeue = true - if e.is_a?(DontRequeue) || (options[:retry_redelivered] == false && message.redelivered?) - requeue = false + nack_message(e, message) + error_handler.call(e, message) unless error_handler.nil? + end + end + + def start_subscription + @sub_ctx.with_channel do |ch| + queue = destination.bunny_queue(@sub_ctx.channel) + ch.prefetch(options[:prefetch_size]) if options.key? :prefetch_size + @bunny_consumer = queue.subscribe(options.merge(manual_ack: true)) do |delivery_info, properties, payload| + adapter.broker.client.with_adapter_context(@sub_ctx) do + message = @sub_ctx.args_to_message(delivery_info, properties, payload, destination) + @message_handler.call(message) end - if @sub_ctx.valid? - begin - @sub_ctx.nack_message(message, requeue: requeue) - rescue => e - logger.error exception_to_str(e) - end - end end - @error_handler.call(e, message) unless @error_handler.nil? end end end def initialize(broker, config) validate_bunny_version @broker = broker @config = config end - def connection(ensure_started=true) + def connection(ensure_started = true) if ensure_started begin @connection ||= Bunny::Session.new(@config) @connection.start rescue *NETWORK_ERRORS => e @@ -210,18 +268,16 @@ end @connection end def stop - begin - super - @connection.close unless @connection.nil? - rescue => e - logger.error "error while attempting connection close\n#{exception_to_str(e)}" - ensure - @connection = nil - end + super + @connection.close unless @connection.nil? + rescue => e + logger.error "error while attempting connection close\n#{exception_to_str(e)}" + ensure + @connection = nil end def build_context BunnyContext.new(self) end @@ -242,35 +298,41 @@ @rollback_only = false @need_channel_reset = false @in_transaction = false end - def create_destination(name, dest_options={}, message_props={}) - dest = case type = dest_options.delete(:type) - when :exchange - ExchangeDestination.new(self.adapter, name, dest_options, message_props) - when :queue, nil - QueueDestination.new(self.adapter, name, dest_options, message_props) - else - raise MessageDriver::Error, "invalid destination type #{type}" - end + def create_destination(name, dest_options = {}, message_props = {}) + dest = case type = dest_options.delete(:type) + when :exchange + ExchangeDestination.new(adapter, name, dest_options, message_props) + when :queue, nil + QueueDestination.new(adapter, name, dest_options, message_props) + else + fail MessageDriver::Error, "invalid destination type #{type}" + end dest.after_initialize(self) dest end def supports_transactions? true end - def begin_transaction(options={}) - raise MessageDriver::TransactionError, "you can't begin another transaction, you are already in one!" if in_transaction? + def begin_transaction(options = {}) + if in_transaction? + fail MessageDriver::TransactionError, + "you can't begin another transaction, you are already in one!" + end @in_transaction = true @in_confirms_transaction = true if options[:type] == :confirm_and_wait end - def commit_transaction(channel_commit=false) - raise MessageDriver::TransactionError, "you can't finish the transaction unless you already in one!" if !in_transaction? && !channel_commit + def commit_transaction(channel_commit = false) + if !in_transaction? && !channel_commit + fail MessageDriver::TransactionError, + "you can't finish the transaction unless you already in one!" + end begin if @in_confirms_transaction wait_for_confirms(@channel) unless @rollback_only else if is_transactional? && valid? && !@need_channel_reset @@ -289,13 +351,12 @@ @in_confirms_transaction = false end end def wait_for_confirms(channel) - until channel.unconfirmed_set.empty? - channel.wait_for_confirms - end + # FIXME: make the thread-safety of this better once https://github.com/ruby-amqp/bunny/issues/227 is fixed + channel.wait_for_confirms until channel.unconfirmed_set.empty? end private :wait_for_confirms def rollback_transaction @rollback_only = true @@ -309,12 +370,12 @@ def in_transaction? @in_transaction end - def publish(destination, body, headers={}, properties={}) - exchange, routing_key, props = *destination.publish_params(headers, properties) + def publish(destination, body, headers = {}, properties = {}) + body, exchange, routing_key, props = *destination.publish_params(body, headers, properties) confirm = props.delete(:confirm) confirm = false if confirm.nil? with_channel(true) do |ch| if confirm == true ch.confirm_select unless ch.using_publisher_confirms? @@ -322,53 +383,53 @@ ch.basic_publish(body, exchange, routing_key, props) ch.wait_for_confirms if confirm == true end end - def pop_message(destination, options={}) - raise MessageDriver::Error, "You can't pop a message off an exchange" if destination.is_a? ExchangeDestination + def pop_message(destination, options = {}) + fail MessageDriver::Error, "You can't pop a message off an exchange" if destination.is_a? ExchangeDestination with_channel(false) do |ch| queue = ch.queue(destination.name, passive: true) message = queue.pop(ack: options.fetch(:client_ack, false)) if message.nil? || message[0].nil? nil else - args_to_message(*message) + args_to_message(*message, destination) end end end def supports_client_acks? true end - def ack_message(message, _options={}) + def ack_message(message, _options = {}) with_channel(true) do |ch| ch.ack(message.delivery_tag) end end - def nack_message(message, options={}) + def nack_message(message, options = {}) requeue = options[:requeue].is_a?(FalseClass) ? false : true with_channel(true) do |ch| ch.reject(message.delivery_tag, requeue) end end def supports_subscriptions? true end - def subscribe(destination, options={}, &consumer) + def subscribe(destination, options = {}, &consumer) sub = Subscription.new(adapter, destination, consumer, options) sub.start sub end - def invalidate(in_unsubscribe=false) + def invalidate(in_unsubscribe = false) super() unless @subscription.nil? || in_unsubscribe begin @subscription.unsubscribe if adapter.connection.open? rescue => e @@ -385,34 +446,32 @@ end end end def handle_errors - begin - yield - rescue Bunny::ChannelLevelException => e - @need_channel_reset = true - @rollback_only = true if in_transaction? - if e.is_a? Bunny::NotFound - raise MessageDriver::QueueNotFound.new(e.to_s, e) - else - raise MessageDriver::WrappedError.new(e.to_s, e) - end - rescue Bunny::ChannelAlreadyClosed => e - @need_channel_reset = true - @rollback_only = true if in_transaction? + yield + rescue Bunny::ChannelLevelException => e + @need_channel_reset = true + @rollback_only = true if in_transaction? + if e.is_a? Bunny::NotFound + raise MessageDriver::QueueNotFound.new(e.to_s, e) + else raise MessageDriver::WrappedError.new(e.to_s, e) - rescue *NETWORK_ERRORS => e - @need_channel_reset = true - @rollback_only = true if in_transaction? - raise MessageDriver::ConnectionError.new(e.to_s, e) end + rescue Bunny::ChannelAlreadyClosed => e + @need_channel_reset = true + @rollback_only = true if in_transaction? + raise MessageDriver::WrappedError.new(e.to_s, e) + rescue *NETWORK_ERRORS => e + @need_channel_reset = true + @rollback_only = true if in_transaction? + raise MessageDriver::ConnectionError.new(e.to_s, e) end - def with_channel(require_commit=true) - raise MessageDriver::TransactionRollbackOnly if @rollback_only - raise MessageDriver::Error, 'this adapter context is not valid!' unless valid? + def with_channel(require_commit = true) + fail MessageDriver::TransactionRollbackOnly if @rollback_only + fail MessageDriver::Error, 'this adapter context is not valid!' unless valid? @channel = adapter.connection.create_channel if @channel.nil? reset_channel if @need_channel_reset if in_transaction? if @in_confirms_transaction @channel.confirm_select unless @channel.using_publisher_confirmations? @@ -428,12 +487,12 @@ commit_transaction(true) if require_commit && is_transactional? && !in_transaction? result end end - def args_to_message(delivery_info, properties, payload) - Message.new(self, delivery_info, properties, payload) + def args_to_message(delivery_info, properties, payload, destination) + Message.new(self, delivery_info, properties, payload, destination) end private def reset_channel @@ -447,21 +506,19 @@ end private def log_errors - begin - yield - rescue => e - logger.error exception_to_str(e) - end + yield + rescue => e + logger.error exception_to_str(e) end def validate_bunny_version required = Gem::Requirement.create('>= 1.2.2') current = Gem::Version.create(Bunny::VERSION) unless required.satisfied_by? current - raise MessageDriver::Error, 'bunny 1.2.2 or later is required for the bunny adapter' + fail MessageDriver::Error, 'bunny 1.2.2 or later is required for the bunny adapter' end end end end end