lib/message_driver/adapters/bunny_adapter.rb in message-driver-0.4.0 vs lib/message_driver/adapters/bunny_adapter.rb in message-driver-0.5.0
- old
+ new
@@ -1,23 +1,33 @@
require 'bunny'
+require 'forwardable'
module MessageDriver
class Broker
def bunny_adapter
MessageDriver::Adapters::BunnyAdapter
end
end
module Adapters
class BunnyAdapter < Base
- NETWORK_ERRORS = [Bunny::TCPConnectionFailed, Bunny::ConnectionClosedError, Bunny::ConnectionLevelException, Bunny::NetworkErrorWrapper, Bunny::NetworkFailure, IOError].freeze
+ NETWORK_ERRORS = [Bunny::TCPConnectionFailed,
+ Bunny::ConnectionClosedError,
+ Bunny::ConnectionLevelException,
+ Bunny::NetworkErrorWrapper,
+ Bunny::NetworkFailure,
+ IOError].freeze
class Message < MessageDriver::Message::Base
attr_reader :delivery_info
- def initialize(ctx, delivery_info, properties, payload)
- super(ctx, payload, properties[:headers]||{}, properties)
+ def initialize(ctx, delivery_info, properties, payload, destination)
+ raw_body = payload
+ raw_headers = properties[:headers]
+ raw_headers = {} if raw_headers.nil?
+ b, h, p = destination.middleware.on_consume(payload, raw_headers, properties)
+ super(ctx, b, h, p, raw_body)
@delivery_info = delivery_info
end
def delivery_tag
delivery_info.delivery_tag
@@ -27,14 +37,15 @@
delivery_info.redelivered?
end
end
class Destination < MessageDriver::Destination::Base
- def publish_params(headers, properties)
- props = @message_props.merge(properties)
- props[:headers] = headers if headers
- [exchange_name, routing_key(properties), props]
+ def publish_params(body, headers, properties)
+ b, h, p = middleware.on_publish(body, headers, properties)
+ props = @message_props.merge(p)
+ props[:headers] = h if h
+ [b, exchange_name, routing_key(properties), props]
end
def exchange_name
@name
end
@@ -45,31 +56,39 @@
end
class QueueDestination < Destination
def after_initialize(adapter_context)
if @dest_options[:no_declare]
- raise MessageDriver::Error, 'server-named queues must be declared, but you provided :no_declare => true' if @name.empty?
- raise MessageDriver::Error, 'queues with bindings must be declared, but you provided :no_declare => true' if @dest_options[:bindings]
+ if @name.empty?
+ fail MessageDriver::Error, 'server-named queues must be declared, but you provided :no_declare => true'
+ end
+ if @dest_options[:bindings]
+ fail MessageDriver::Error, 'queues with bindings must be declared, but you provided :no_declare => true'
+ end
else
adapter_context.with_channel(false) do |ch|
- bunny_queue(ch, true)
+ bunny_queue(ch, init: true)
end
end
end
- def bunny_queue(channel, initialize=false)
- queue = channel.queue(@name, @dest_options)
- if initialize
- @name = queue.name
- if (bindings = @dest_options[:bindings])
- bindings.each do |bnd|
- raise MessageDriver::Error, "binding #{bnd.inspect} must provide a source!" unless bnd[:source]
- queue.bind(bnd[:source], bnd[:args]||{})
- end
+ def bunny_queue(channel, options = {})
+ opts = @dest_options.dup
+ opts.merge!(passive: options[:passive]) if options.key? :passive
+ queue = channel.queue(@name, opts)
+ handle_queue_init(queue) if options.fetch(:init, false)
+ queue
+ end
+
+ def handle_queue_init(queue)
+ @name = queue.name
+ if (bindings = @dest_options[:bindings])
+ bindings.each do |bnd|
+ fail MessageDriver::Error, "binding #{bnd.inspect} must provide a source!" unless bnd[:source]
+ queue.bind(bnd[:source], bnd[:args] || {})
end
end
- queue
end
def exchange_name
''
end
@@ -78,14 +97,24 @@
@name
end
def message_count
adapter.broker.client.current_adapter_context.with_channel(false) do |ch|
- ch.queue(@name, @dest_options.merge(passive: true)).message_count
+ bunny_queue(ch, passive: true).message_count
end
end
+ def subscribe(options = {}, &consumer)
+ adapter.broker.client.current_adapter_context.subscribe(self, options, &consumer)
+ end
+
+ def consumer_count
+ adapter.broker.client.current_adapter_context.with_channel(false) do |ch|
+ bunny_queue(ch, passive: true).consumer_count
+ end
+ end
+
def purge
adapter.broker.client.current_adapter_context.with_channel(false) do |ch|
bunny_queue(ch).purge
end
end
@@ -94,40 +123,45 @@
class ExchangeDestination < Destination
def after_initialize(adapter_context)
if (declare = @dest_options[:declare])
adapter_context.with_channel(false) do |ch|
type = declare.delete(:type)
- raise MessageDriver::Error, 'you must provide a valid exchange type' unless type
+ fail MessageDriver::Error, 'you must provide a valid exchange type' unless type
ch.exchange_declare(@name, type, declare)
end
end
if (bindings = @dest_options[:bindings])
adapter_context.with_channel(false) do |ch|
bindings.each do |bnd|
- raise MessageDriver::Error, "binding #{bnd.inspect} must provide a source!" unless bnd[:source]
- ch.exchange_bind(bnd[:source], @name, bnd[:args]||{})
+ fail MessageDriver::Error, "binding #{bnd.inspect} must provide a source!" unless bnd[:source]
+ ch.exchange_bind(bnd[:source], @name, bnd[:args] || {})
end
end
end
end
end
class Subscription < Subscription::Base
+ attr_reader :sub_ctx, :error_handler
+
def start
- raise MessageDriver::Error, 'subscriptions are only supported with QueueDestinations' unless destination.is_a? QueueDestination
+ unless destination.is_a? QueueDestination
+ fail MessageDriver::Error,
+ 'subscriptions are only supported with QueueDestinations'
+ end
@sub_ctx = adapter.new_subscription_context(self)
@error_handler = options[:error_handler]
- @ack_mode = case options[:ack]
- when :auto, nil
- :auto
- when :manual
- :manual
- when :transactional
- :transactional
- else
- raise MessageDriver::Error, "unrecognized :ack option #{options[:ack]}"
- end
+ @message_handler = case options[:ack]
+ when :auto, nil
+ AutoAckHandler.new(self)
+ when :manual
+ ManualAckHandler.new(self)
+ when :transactional
+ TransactionalAckHandler.new(self)
+ else
+ fail MessageDriver::Error, "unrecognized :ack option #{options[:ack]}"
+ end
start_subscription
end
def unsubscribe
unless @bunny_consumer.nil?
@@ -140,65 +174,89 @@
end
end
private
- def start_subscription
- @sub_ctx.with_channel do |ch|
- queue = destination.bunny_queue(@sub_ctx.channel)
- if options.key? :prefetch_size
- ch.prefetch(options[:prefetch_size])
+ class MessageHandler
+ extend Forwardable
+ include Logging
+
+ attr_accessor :subscription
+ def_delegators :subscription, :adapter, :sub_ctx, :consumer, :error_handler, :options
+
+ def initialize(subscription)
+ @subscription = subscription
+ end
+
+ def call(message)
+ consumer.call(message)
+ rescue => e
+ error_handler.call(e, message) unless error_handler.nil?
+ end
+
+ def nack_message(e, message)
+ requeue = true
+ if e.is_a?(DontRequeue) || (options[:retry_redelivered] == false && message.redelivered?)
+ requeue = false
end
- @bunny_consumer = queue.subscribe(options.merge(manual_ack: true)) do |delivery_info, properties, payload|
- adapter.broker.client.with_adapter_context(@sub_ctx) do
- message = @sub_ctx.args_to_message(delivery_info, properties, payload)
- handle_message(message)
+ if sub_ctx.valid?
+ begin
+ sub_ctx.nack_message(message, requeue: requeue)
+ rescue => e
+ logger.error exception_to_str(e)
end
end
end
end
- def handle_message(message)
- begin
- case @ack_mode
- when :auto
+ class ManualAckHandler < MessageHandler
+ # all functionality implemented in super class
+ end
+
+ class AutoAckHandler < MessageHandler
+ def call(message)
+ consumer.call(message)
+ sub_ctx.ack_message(message)
+ rescue => e
+ nack_message(e, message)
+ error_handler.call(e, message) unless error_handler.nil?
+ end
+ end
+
+ class TransactionalAckHandler < MessageHandler
+ def call(message)
+ adapter.broker.client.with_message_transaction do
consumer.call(message)
- @sub_ctx.ack_message(message)
- when :manual
- consumer.call(message)
- when :transactional
- adapter.broker.client.with_message_transaction do
- consumer.call(message)
- @sub_ctx.ack_message(message)
- end
+ sub_ctx.ack_message(message)
end
rescue => e
- if [:auto, :transactional].include? @ack_mode
- requeue = true
- if e.is_a?(DontRequeue) || (options[:retry_redelivered] == false && message.redelivered?)
- requeue = false
+ nack_message(e, message)
+ error_handler.call(e, message) unless error_handler.nil?
+ end
+ end
+
+ def start_subscription
+ @sub_ctx.with_channel do |ch|
+ queue = destination.bunny_queue(@sub_ctx.channel)
+ ch.prefetch(options[:prefetch_size]) if options.key? :prefetch_size
+ @bunny_consumer = queue.subscribe(options.merge(manual_ack: true)) do |delivery_info, properties, payload|
+ adapter.broker.client.with_adapter_context(@sub_ctx) do
+ message = @sub_ctx.args_to_message(delivery_info, properties, payload, destination)
+ @message_handler.call(message)
end
- if @sub_ctx.valid?
- begin
- @sub_ctx.nack_message(message, requeue: requeue)
- rescue => e
- logger.error exception_to_str(e)
- end
- end
end
- @error_handler.call(e, message) unless @error_handler.nil?
end
end
end
def initialize(broker, config)
validate_bunny_version
@broker = broker
@config = config
end
- def connection(ensure_started=true)
+ def connection(ensure_started = true)
if ensure_started
begin
@connection ||= Bunny::Session.new(@config)
@connection.start
rescue *NETWORK_ERRORS => e
@@ -210,18 +268,16 @@
end
@connection
end
def stop
- begin
- super
- @connection.close unless @connection.nil?
- rescue => e
- logger.error "error while attempting connection close\n#{exception_to_str(e)}"
- ensure
- @connection = nil
- end
+ super
+ @connection.close unless @connection.nil?
+ rescue => e
+ logger.error "error while attempting connection close\n#{exception_to_str(e)}"
+ ensure
+ @connection = nil
end
def build_context
BunnyContext.new(self)
end
@@ -242,35 +298,41 @@
@rollback_only = false
@need_channel_reset = false
@in_transaction = false
end
- def create_destination(name, dest_options={}, message_props={})
- dest = case type = dest_options.delete(:type)
- when :exchange
- ExchangeDestination.new(self.adapter, name, dest_options, message_props)
- when :queue, nil
- QueueDestination.new(self.adapter, name, dest_options, message_props)
- else
- raise MessageDriver::Error, "invalid destination type #{type}"
- end
+ def create_destination(name, dest_options = {}, message_props = {})
+ dest = case type = dest_options.delete(:type)
+ when :exchange
+ ExchangeDestination.new(adapter, name, dest_options, message_props)
+ when :queue, nil
+ QueueDestination.new(adapter, name, dest_options, message_props)
+ else
+ fail MessageDriver::Error, "invalid destination type #{type}"
+ end
dest.after_initialize(self)
dest
end
def supports_transactions?
true
end
- def begin_transaction(options={})
- raise MessageDriver::TransactionError, "you can't begin another transaction, you are already in one!" if in_transaction?
+ def begin_transaction(options = {})
+ if in_transaction?
+ fail MessageDriver::TransactionError,
+ "you can't begin another transaction, you are already in one!"
+ end
@in_transaction = true
@in_confirms_transaction = true if options[:type] == :confirm_and_wait
end
- def commit_transaction(channel_commit=false)
- raise MessageDriver::TransactionError, "you can't finish the transaction unless you already in one!" if !in_transaction? && !channel_commit
+ def commit_transaction(channel_commit = false)
+ if !in_transaction? && !channel_commit
+ fail MessageDriver::TransactionError,
+ "you can't finish the transaction unless you already in one!"
+ end
begin
if @in_confirms_transaction
wait_for_confirms(@channel) unless @rollback_only
else
if is_transactional? && valid? && !@need_channel_reset
@@ -289,13 +351,12 @@
@in_confirms_transaction = false
end
end
def wait_for_confirms(channel)
- until channel.unconfirmed_set.empty?
- channel.wait_for_confirms
- end
+ # FIXME: make the thread-safety of this better once https://github.com/ruby-amqp/bunny/issues/227 is fixed
+ channel.wait_for_confirms until channel.unconfirmed_set.empty?
end
private :wait_for_confirms
def rollback_transaction
@rollback_only = true
@@ -309,12 +370,12 @@
def in_transaction?
@in_transaction
end
- def publish(destination, body, headers={}, properties={})
- exchange, routing_key, props = *destination.publish_params(headers, properties)
+ def publish(destination, body, headers = {}, properties = {})
+ body, exchange, routing_key, props = *destination.publish_params(body, headers, properties)
confirm = props.delete(:confirm)
confirm = false if confirm.nil?
with_channel(true) do |ch|
if confirm == true
ch.confirm_select unless ch.using_publisher_confirms?
@@ -322,53 +383,53 @@
ch.basic_publish(body, exchange, routing_key, props)
ch.wait_for_confirms if confirm == true
end
end
- def pop_message(destination, options={})
- raise MessageDriver::Error, "You can't pop a message off an exchange" if destination.is_a? ExchangeDestination
+ def pop_message(destination, options = {})
+ fail MessageDriver::Error, "You can't pop a message off an exchange" if destination.is_a? ExchangeDestination
with_channel(false) do |ch|
queue = ch.queue(destination.name, passive: true)
message = queue.pop(ack: options.fetch(:client_ack, false))
if message.nil? || message[0].nil?
nil
else
- args_to_message(*message)
+ args_to_message(*message, destination)
end
end
end
def supports_client_acks?
true
end
- def ack_message(message, _options={})
+ def ack_message(message, _options = {})
with_channel(true) do |ch|
ch.ack(message.delivery_tag)
end
end
- def nack_message(message, options={})
+ def nack_message(message, options = {})
requeue = options[:requeue].is_a?(FalseClass) ? false : true
with_channel(true) do |ch|
ch.reject(message.delivery_tag, requeue)
end
end
def supports_subscriptions?
true
end
- def subscribe(destination, options={}, &consumer)
+ def subscribe(destination, options = {}, &consumer)
sub = Subscription.new(adapter, destination, consumer, options)
sub.start
sub
end
- def invalidate(in_unsubscribe=false)
+ def invalidate(in_unsubscribe = false)
super()
unless @subscription.nil? || in_unsubscribe
begin
@subscription.unsubscribe if adapter.connection.open?
rescue => e
@@ -385,34 +446,32 @@
end
end
end
def handle_errors
- begin
- yield
- rescue Bunny::ChannelLevelException => e
- @need_channel_reset = true
- @rollback_only = true if in_transaction?
- if e.is_a? Bunny::NotFound
- raise MessageDriver::QueueNotFound.new(e.to_s, e)
- else
- raise MessageDriver::WrappedError.new(e.to_s, e)
- end
- rescue Bunny::ChannelAlreadyClosed => e
- @need_channel_reset = true
- @rollback_only = true if in_transaction?
+ yield
+ rescue Bunny::ChannelLevelException => e
+ @need_channel_reset = true
+ @rollback_only = true if in_transaction?
+ if e.is_a? Bunny::NotFound
+ raise MessageDriver::QueueNotFound.new(e.to_s, e)
+ else
raise MessageDriver::WrappedError.new(e.to_s, e)
- rescue *NETWORK_ERRORS => e
- @need_channel_reset = true
- @rollback_only = true if in_transaction?
- raise MessageDriver::ConnectionError.new(e.to_s, e)
end
+ rescue Bunny::ChannelAlreadyClosed => e
+ @need_channel_reset = true
+ @rollback_only = true if in_transaction?
+ raise MessageDriver::WrappedError.new(e.to_s, e)
+ rescue *NETWORK_ERRORS => e
+ @need_channel_reset = true
+ @rollback_only = true if in_transaction?
+ raise MessageDriver::ConnectionError.new(e.to_s, e)
end
- def with_channel(require_commit=true)
- raise MessageDriver::TransactionRollbackOnly if @rollback_only
- raise MessageDriver::Error, 'this adapter context is not valid!' unless valid?
+ def with_channel(require_commit = true)
+ fail MessageDriver::TransactionRollbackOnly if @rollback_only
+ fail MessageDriver::Error, 'this adapter context is not valid!' unless valid?
@channel = adapter.connection.create_channel if @channel.nil?
reset_channel if @need_channel_reset
if in_transaction?
if @in_confirms_transaction
@channel.confirm_select unless @channel.using_publisher_confirmations?
@@ -428,12 +487,12 @@
commit_transaction(true) if require_commit && is_transactional? && !in_transaction?
result
end
end
- def args_to_message(delivery_info, properties, payload)
- Message.new(self, delivery_info, properties, payload)
+ def args_to_message(delivery_info, properties, payload, destination)
+ Message.new(self, delivery_info, properties, payload, destination)
end
private
def reset_channel
@@ -447,21 +506,19 @@
end
private
def log_errors
- begin
- yield
- rescue => e
- logger.error exception_to_str(e)
- end
+ yield
+ rescue => e
+ logger.error exception_to_str(e)
end
def validate_bunny_version
required = Gem::Requirement.create('>= 1.2.2')
current = Gem::Version.create(Bunny::VERSION)
unless required.satisfied_by? current
- raise MessageDriver::Error, 'bunny 1.2.2 or later is required for the bunny adapter'
+ fail MessageDriver::Error, 'bunny 1.2.2 or later is required for the bunny adapter'
end
end
end
end
end