lib/message_driver/adapters/bunny_adapter.rb in message-driver-0.2.1 vs lib/message_driver/adapters/bunny_adapter.rb in message-driver-0.2.2
- old
+ new
@@ -1,6 +1,7 @@
require 'bunny'
+require 'bunny/session_patch'
module MessageDriver
class Broker
def bunny_adapter
MessageDriver::Adapters::BunnyAdapter
@@ -114,83 +115,93 @@
class Subscription < Subscription::Base
def start
raise MessageDriver::Error, "subscriptions are only supported with QueueDestinations" unless destination.is_a? QueueDestination
@sub_ctx = adapter.new_subscription_context(self)
@error_handler = options[:error_handler]
+ @ack_mode = case options[:ack]
+ when :auto, nil
+ :auto
+ when :manual
+ :manual
+ when :transactional
+ :transactional
+ else
+ raise MessageDriver::Error, "unrecognized :ack option #{options[:ack]}"
+ end
+ start_subscription
+ end
+
+ def unsubscribe
+ unless @bunny_consumer.nil?
+ @bunny_consumer.cancel
+ @bunny_consumer = nil
+ end
+ unless @sub_ctx.nil?
+ @sub_ctx.invalidate(true)
+ @sub_ctx = nil
+ end
+ end
+
+ private
+ def start_subscription
@sub_ctx.with_channel do |ch|
queue = destination.bunny_queue(@sub_ctx.channel)
if options.has_key? :prefetch_size
ch.prefetch(options[:prefetch_size])
end
- ack_mode = case options[:ack]
- when :auto, nil
- :auto
- when :manual
- :manual
- when :transactional
- :transactional
- else
- raise MessageDriver::Error, "unrecognized :ack option #{options[:ack]}"
- end
@bunny_consumer = queue.subscribe(options.merge(manual_ack: true)) do |delivery_info, properties, payload|
- message = @sub_ctx.args_to_message(delivery_info, properties, payload)
Client.with_adapter_context(@sub_ctx) do
+ message = @sub_ctx.args_to_message(delivery_info, properties, payload)
+ handle_message(message)
+ end
+ end
+ end
+ end
+
+ def handle_message(message)
+ begin
+ case @ack_mode
+ when :auto
+ consumer.call(message)
+ @sub_ctx.ack_message(message)
+ when :manual
+ consumer.call(message)
+ when :transactional
+ Client.with_message_transaction do
+ consumer.call(message)
+ @sub_ctx.ack_message(message)
+ end
+ end
+ rescue => e
+ if [:auto, :transactional].include? @ack_mode
+ requeue = true
+ if e.is_a?(DontRequeue) || (options[:retry_redelivered] == false && message.redelivered?)
+ requeue = false
+ end
+ if @sub_ctx.valid?
begin
- case ack_mode
- when :auto
- consumer.call(message)
- @sub_ctx.ack_message(message)
- when :manual
- consumer.call(message)
- when :transactional
- Client.with_message_transaction do
- consumer.call(message)
- @sub_ctx.ack_message(message)
- end
- end
+ @sub_ctx.nack_message(message, requeue: requeue)
rescue => e
- if [:auto, :transactional].include? ack_mode
- requeue = true
- if e.is_a?(DontRequeue) || (options[:retry_redelivered] == false && message.redelivered?)
- requeue = false
- end
- if @sub_ctx.valid?
- begin
- @sub_ctx.nack_message(message, requeue: requeue)
- rescue => e
- logger.error exception_to_str(e)
- end
- end
- end
- @error_handler.call(e, message) unless @error_handler.nil?
+ logger.error exception_to_str(e)
end
end
end
+ @error_handler.call(e, message) unless @error_handler.nil?
end
end
-
- def unsubscribe
- unless @bunny_consumer.nil?
- @bunny_consumer.cancel
- @bunny_consumer = nil
- end
- unless @sub_ctx.nil?
- @sub_ctx.invalidate(true)
- @sub_ctx = nil
- end
- end
end
def initialize(config)
validate_bunny_version
@config = config
- start_connection_thread
+ @handle_connection_errors = config.fetch(:handle_connection_errors, true)
+ initialize_connection
end
def connection(ensure_started=true)
- start_connection_thread
- if ensure_started && !@connection.open?
+ initialize_connection
+ if ensure_started
begin
@connection.start
rescue *NETWORK_ERRORS => e
raise MessageDriver::ConnectionError.new(e.to_s, e)
end
@@ -202,10 +213,14 @@
begin
super
@connection.close if !@connection.nil? && @connection.open?
rescue *NETWORK_ERRORS => e
logger.error "error while attempting connection close\n#{exception_to_str(e)}"
+ ensure
+ conn = @connection
+ @connection = nil
+ conn.cleanup_threads unless conn.nil?
end
end
def build_context
BunnyContext.new(self)
@@ -338,14 +353,24 @@
end
def invalidate(in_unsubscribe=false)
super()
unless @subscription.nil? || in_unsubscribe
- @subscription.unsubscribe
+ begin
+ @subscription.unsubscribe if adapter.connection.open?
+ rescue => e
+ logger.debug "error trying to end subscription\n#{exception_to_str(e)}"
+ end
end
unless @channel.nil?
- @channel.close if @channel.open?
+ begin
+ @channel.close if @channel.open? && adapter.connection.open?
+ rescue => e
+ logger.debug "error trying to close channel\n#{exception_to_str(e)}"
+ ensure
+ begin @channel.maybe_kill_consumer_work_pool! rescue nil; end
+ end
end
end
def handle_errors
begin
@@ -387,40 +412,57 @@
private
def reset_channel
unless @channel.open?
- @channel.open
+ @channel = adapter.connection.create_channel
@is_transactional = false
@rollback_only = true if in_transaction?
end
@need_channel_reset = false
end
end
private
- def start_connection_thread
- @connection_thread ||= Thread.new do
- begin
- @connection = Bunny.new(@config)
- sleep
- rescue *NETWORK_ERRORS => e
- logger.error "error on connection\n#{exception_to_str(e)}"
- begin
- while true
- @connection.start
+ def log_errors
+ begin
+ yield
+ rescue => e
+ logger.error exception_to_str(e)
+ end
+ end
+
+ def initialize_connection
+ if @handle_connection_errors
+ if @connection_thread.nil?
+ #hi mom!
+ @connection_thread = Thread.new do
+ @connection = Bunny.new(@config)
+ begin
sleep
+ rescue *NETWORK_ERRORS => e
+ logger.error "error on connection\n#{exception_to_str(e)}"
+ if @connection.automatically_recover?
+ sleep 0.1
+ unless @connection.recovering_from_network_failure?
+ stop
+ end
+ else
+ stop
+ end
+ retry
+ rescue => e
+ logger.error "unhandled error in connection thread! #{exception_to_str(e)}"
end
- rescue *NETWORK_ERRORS => e
- logger.error "error trying to restart connection\n#{exception_to_str(e)}"
- sleep 1
- retry
end
+ @connection_thread.abort_on_exception = true
+ sleep 0.1
end
- @connection_thread = nil
+ sleep 0.1 while @connection_thread.status != 'sleep'
+ else
+ @connection ||= Bunny.new(@config)
end
- sleep 0.1 while @connection_thread.status != 'sleep'
end
def validate_bunny_version
required = Gem::Requirement.create('>= 0.10.8')
current = Gem::Version.create(Bunny::VERSION)