lib/message_driver/adapters/bunny_adapter.rb in message-driver-0.1.0 vs lib/message_driver/adapters/bunny_adapter.rb in message-driver-0.2.0.rc1
- old
+ new
@@ -7,25 +7,34 @@
end
end
module Adapters
class BunnyAdapter < Base
+ NETWORK_ERRORS = [Bunny::TCPConnectionFailed, Bunny::ConnectionClosedError, Bunny::ConnectionLevelException, Bunny::NetworkErrorWrapper, Bunny::NetworkFailure, IOError].freeze
class Message < MessageDriver::Message::Base
attr_reader :delivery_info
def initialize(delivery_info, properties, payload)
super(payload, properties[:headers]||{}, properties)
@delivery_info = delivery_info
end
+
+ def delivery_tag
+ delivery_info.delivery_tag
+ end
+
+ def redelivered?
+ delivery_info.redelivered?
+ end
end
class Destination < MessageDriver::Destination::Base
- def publish(body, headers={}, properties={})
+ def publish_params(headers, properties)
props = @message_props.merge(properties)
props[:headers] = headers if headers
- @adapter.publish(body, exchange_name, routing_key(properties), props)
+ [exchange_name, routing_key(properties), props]
end
def exchange_name
@name
end
@@ -34,236 +43,351 @@
properties[:routing_key]
end
end
class QueueDestination < Destination
- def after_initialize
+ def after_initialize(adapter_context)
unless @dest_options[:no_declare]
- @adapter.current_context.with_channel(false) do |ch|
- queue = ch.queue(@name, @dest_options)
- @name = queue.name
- if bindings = @dest_options[:bindings]
- bindings.each do |bnd|
- raise MessageDriver::Exception, "binding #{bnd.inspect} must provide a source!" unless bnd[:source]
- queue.bind(bnd[:source], bnd[:args]||{})
- end
- end
+ adapter_context.with_channel(false) do |ch|
+ bunny_queue(ch, true)
end
else
- raise MessageDriver::Exception, "server-named queues must be declared, but you provided :no_declare => true" if @name.empty?
- raise MessageDriver::Exception, "queues with bindings must be declared, but you provided :no_declare => true" if @dest_options[:bindings]
+ raise MessageDriver::Error, "server-named queues must be declared, but you provided :no_declare => true" if @name.empty?
+ raise MessageDriver::Error, "queues with bindings must be declared, but you provided :no_declare => true" if @dest_options[:bindings]
end
end
+ def bunny_queue(channel, initialize=false)
+ queue = channel.queue(@name, @dest_options)
+ if initialize
+ @name = queue.name
+ if bindings = @dest_options[:bindings]
+ bindings.each do |bnd|
+ raise MessageDriver::Error, "binding #{bnd.inspect} must provide a source!" unless bnd[:source]
+ queue.bind(bnd[:source], bnd[:args]||{})
+ end
+ end
+ end
+ queue
+ end
+
def exchange_name
""
end
def routing_key(properties)
@name
end
def message_count
- @adapter.current_context.with_channel(false) do |ch|
+ Client.current_adapter_context.with_channel(false) do |ch|
ch.queue(@name, @dest_options.merge(passive: true)).message_count
end
end
- end
- class ExchangeDestination < Destination
- def pop_message(destination, options={})
- raise MessageDriver::Exception, "You can't pop a message off an exchange"
+ def purge
+ Client.current_adapter_context.with_channel(false) do |ch|
+ bunny_queue(ch).purge
+ end
end
+ end
- def after_initialize
+ class ExchangeDestination < Destination
+ def after_initialize(adapter_context)
if declare = @dest_options[:declare]
- @adapter.current_context.with_channel(false) do |ch|
+ adapter_context.with_channel(false) do |ch|
type = declare.delete(:type)
- raise MessageDriver::Exception, "you must provide a valid exchange type" unless type
+ raise MessageDriver::Error, "you must provide a valid exchange type" unless type
ch.exchange_declare(@name, type, declare)
end
end
if bindings = @dest_options[:bindings]
- @adapter.current_context.with_channel(false) do |ch|
+ adapter_context.with_channel(false) do |ch|
bindings.each do |bnd|
- raise MessageDriver::Exception, "binding #{bnd.inspect} must provide a source!" unless bnd[:source]
+ raise MessageDriver::Error, "binding #{bnd.inspect} must provide a source!" unless bnd[:source]
ch.exchange_bind(bnd[:source], @name, bnd[:args]||{})
end
end
end
end
end
+ class Subscription < Subscription::Base
+ def start
+ raise MessageDriver::Error, "subscriptions are only supported with QueueDestinations" unless destination.is_a? QueueDestination
+ @sub_ctx = adapter.new_subscription_context(self)
+ @error_handler = options[:error_handler]
+ @sub_ctx.with_channel do |ch|
+ queue = destination.bunny_queue(@sub_ctx.channel)
+ ack_mode = case options[:ack]
+ when :auto, nil
+ :auto
+ when :manual
+ :manual
+ when :transactional
+ :transactional
+ else
+ raise MessageDriver::Error, "unrecognized :ack option #{options[:ack]}"
+ end
+ @bunny_consumer = queue.subscribe(options.merge(manual_ack: true)) do |delivery_info, properties, payload|
+ message = @sub_ctx.args_to_message(delivery_info, properties, payload)
+ Client.with_adapter_context(@sub_ctx) do
+ begin
+ case ack_mode
+ when :auto
+ consumer.call(message)
+ @sub_ctx.ack_message(message)
+ when :manual
+ consumer.call(message)
+ when :transactional
+ Client.with_message_transaction do
+ consumer.call(message)
+ @sub_ctx.ack_message(message)
+ end
+ end
+ rescue => e
+ if e.is_a?(DontRequeue) || (options[:retry_redelivered] == false && message.redelivered?)
+ if [:auto, :transactional].include? ack_mode
+ @sub_ctx.nack_message(message, requeue: false)
+ end
+ else
+ if @sub_ctx.valid? && ack_mode == :auto
+ begin
+ @sub_ctx.nack_message(message, requeue: true)
+ rescue => e
+ logger.error exception_to_str(e)
+ end
+ end
+ @error_handler.call(e, message) unless @error_handler.nil?
+ end
+ end
+ end
+ end
+ end
+ end
+
+ def unsubscribe
+ unless @bunny_consumer.nil?
+ @bunny_consumer.cancel
+ @bunny_consumer = nil
+ end
+ unless @sub_ctx.nil?
+ @sub_ctx.invalidate(true)
+ @sub_ctx = nil
+ end
+ end
+ end
+
def initialize(config)
validate_bunny_version
-
- @connection = Bunny.new(config.merge(threaded: false))
+ @config = config
end
def connection(ensure_started=true)
+ @connection ||= Bunny.new(@config)
if ensure_started && !@connection.open?
- @connection.start
+ begin
+ @connection.start
+ rescue *NETWORK_ERRORS => e
+ raise MessageDriver::ConnectionError.new(e.to_s, e)
+ end
end
@connection
end
- def publish(body, exchange, routing_key, properties)
- current_context.with_channel(true) do |ch|
- ch.basic_publish(body, exchange, routing_key, properties)
+ def stop
+ begin
+ super
+ @connection.close if !@connection.nil? && @connection.open?
+ rescue *NETWORK_ERRORS => e
+ logger.error "error while attempting connection close\n#{exception_to_str(e)}"
end
end
- def pop_message(destination, options={})
- current_context.with_channel do |ch|
- queue = ch.queue(destination, passive: true)
+ def build_context
+ BunnyContext.new(self)
+ end
- message = queue.pop
- if message.nil? || message[0].nil?
- nil
+ def new_subscription_context(subscription)
+ ctx = new_context
+ ctx.channel = connection.create_channel
+ ctx.subscription = subscription
+ ctx
+ end
+
+ class BunnyContext < ContextBase
+ attr_accessor :channel, :subscription
+
+ def initialize(adapter)
+ super(adapter)
+ @is_transactional = false
+ @rollback_only = false
+ @need_channel_reset = false
+ @in_transaction = false
+ end
+
+ def create_destination(name, dest_options={}, message_props={})
+ dest = case type = dest_options.delete(:type)
+ when :exchange
+ ExchangeDestination.new(self.adapter, name, dest_options, message_props)
+ when :queue, nil
+ QueueDestination.new(self.adapter, name, dest_options, message_props)
else
- Message.new(*message)
+ raise MessageDriver::Error, "invalid destination type #{type}"
end
+ dest.after_initialize(self)
+ dest
end
- end
- def create_destination(name, dest_options={}, message_props={})
- case type = dest_options.delete(:type)
- when :exchange
- ExchangeDestination.new(self, name, dest_options, message_props)
- when :queue, nil
- QueueDestination.new(self, name, dest_options, message_props)
- else
- raise MessageDriver::Exception, "invalid destination type #{type}"
+ def supports_transactions?
+ true
end
- end
- def with_transaction(options={}, &block)
- current_context.with_transaction(&block)
- end
+ def begin_transaction(options={})
+ raise MessageDriver::TransactionError, "you can't begin another transaction, you are already in one!" if in_transaction?
+ unless is_transactional?
+ with_channel(false) do |ch|
+ ch.tx_select
+ end
+ @is_transactional = true
+ end
+ @in_transaction = true
+ end
- def stop
- @connection.close if @connection.open?
- @context = nil
- end
+ def commit_transaction(channel_commit=false)
+ raise MessageDriver::TransactionError, "you can't finish the transaction unless you already in one!" if !in_transaction? && !channel_commit
+ begin
+ if is_transactional? && valid? && !@need_channel_reset
+ if @rollback_only
+ @channel.tx_rollback
+ else
+ @channel.tx_commit
+ end
+ end
+ ensure
+ @rollback_only = false
+ @in_transaction = false
+ end
+ end
- def current_context
- if !@context.nil? && @context.need_new_context?
- @context = nil
+ def rollback_transaction
+ @rollback_only = true
+ commit_transaction
end
- @context ||= ChannelContext.new(connection)
- end
- private
+ def is_transactional?
+ @is_transactional
+ end
- class ChannelContext
- attr_reader :connection, :transaction_depth
+ def in_transaction?
+ @in_transaction
+ end
- def initialize(connection)
- @connection = connection
- @channel = connection.create_channel
- @transaction_depth = 0
- @is_transactional = false
- @rollback_only = false
- @need_channel_reset = false
- @connection_failed = false
+ def publish(destination, body, headers={}, properties={})
+ exchange, routing_key, props = *destination.publish_params(headers, properties)
+ with_channel(true) do |ch|
+ ch.basic_publish(body, exchange, routing_key, props)
+ end
end
- def is_transactional?
- @is_transactional
+ def pop_message(destination, options={})
+ raise MessageDriver::Error, "You can't pop a message off an exchange" if destination.is_a? ExchangeDestination
+
+ with_channel(false) do |ch|
+ queue = ch.queue(destination.name, passive: true)
+
+ message = queue.pop(ack: !!options[:client_ack])
+ if message.nil? || message[0].nil?
+ nil
+ else
+ args_to_message(*message)
+ end
+ end
end
- def connection_failed?
- @connection_failed
+ def supports_client_acks?
+ true
end
- def with_transaction(&block)
- if !is_transactional?
- @channel.tx_select
- @is_transactional = true
+ def ack_message(message, options={})
+ with_channel(true) do |ch|
+ ch.ack(message.delivery_tag)
end
+ end
- begin
- @transaction_depth += 1
- yield
- commit_transaction
- rescue
- rollback_transaction
- raise
- ensure
- @transaction_depth -= 1
+ def nack_message(message, options={})
+ requeue = options[:requeue].kind_of?(FalseClass) ? false : true
+ with_channel(true) do |ch|
+ ch.reject(message.delivery_tag, requeue)
end
end
+ def supports_subscriptions?
+ true
+ end
+
+ def subscribe(destination, options={}, &consumer)
+ sub = Subscription.new(adapter, destination, consumer, options)
+ sub.start
+ sub
+ end
+
+ def invalidate(in_unsubscribe=false)
+ super()
+ unless @subscription.nil? || in_unsubscribe
+ @subscription.unsubscribe
+ end
+ unless @channel.nil?
+ @channel.close if @channel.open?
+ end
+ end
+
def with_channel(require_commit=true)
raise MessageDriver::TransactionRollbackOnly if @rollback_only
- raise MessageDriver::Exception, "oh shit!" if @connection_failed
+ raise MessageDriver::Error, "oh nos!" if !valid?
+ @channel = adapter.connection.create_channel if @channel.nil?
reset_channel if @need_channel_reset
begin
result = yield @channel
- commit_transaction(true) if require_commit
+ commit_transaction(true) if require_commit && is_transactional? && !in_transaction?
result
rescue Bunny::ChannelLevelException => e
@need_channel_reset = true
- @rollback_only = true if is_transactional?
+ @rollback_only = true if in_transaction?
if e.kind_of? Bunny::NotFound
- raise MessageDriver::QueueNotFound.new(e)
+ raise MessageDriver::QueueNotFound.new(e.to_s, e)
else
- raise MessageDriver::WrappedException.new(e)
+ raise MessageDriver::WrappedError.new(e.to_s, e)
end
- rescue Bunny::NetworkErrorWrapper, IOError => e
- @connection_failed = true
- @rollback_only = true if is_transactional?
- raise MessageDriver::ConnectionException.new(e)
+ rescue *NETWORK_ERRORS => e
+ @rollback_only = true if in_transaction?
+ raise MessageDriver::ConnectionError.new(e.to_s, e)
end
end
- def within_transaction?
- @transaction_depth > 0
+ def args_to_message(delivery_info, properties, payload)
+ Message.new(delivery_info, properties, payload)
end
- def need_new_context?
- if is_transactional?
- !within_transaction? && connection_failed?
- else
- connection_failed?
- end
- end
-
private
def reset_channel
unless @channel.open?
@channel.open
@is_transactional = false
+ @rollback_only = true if in_transaction?
end
@need_channel_reset = false
end
-
- def commit_transaction(from_channel=false)
- threshold = from_channel ? 0 : 1
- if is_transactional? && @transaction_depth <= threshold && !connection_failed?
- unless @need_channel_reset
- unless @rollback_only
- @channel.tx_commit
- else
- @channel.tx_rollback
- end
- end
- @rollback_only = false
- end
- end
-
- def rollback_transaction
- @rollback_only = true
- commit_transaction
- end
end
+ private
+
def validate_bunny_version
- required = Gem::Requirement.create('~> 0.9.0.pre7')
+ required = Gem::Requirement.create('>= 0.9.3')
current = Gem::Version.create(Bunny::VERSION)
unless required.satisfied_by? current
- raise MessageDriver::Exception, "bunny 0.9.0.pre7 or later is required for the bunny adapter"
+ raise MessageDriver::Error, "bunny 0.9.3 or later is required for the bunny adapter"
end
end
end
end
end