lib/message_driver/adapters/bunny_adapter.rb in message-driver-0.2.1 vs lib/message_driver/adapters/bunny_adapter.rb in message-driver-0.2.2

- old
+ new

@@ -1,6 +1,7 @@ require 'bunny' +require 'bunny/session_patch' module MessageDriver class Broker def bunny_adapter MessageDriver::Adapters::BunnyAdapter @@ -114,83 +115,93 @@ class Subscription < Subscription::Base def start raise MessageDriver::Error, "subscriptions are only supported with QueueDestinations" unless destination.is_a? QueueDestination @sub_ctx = adapter.new_subscription_context(self) @error_handler = options[:error_handler] + @ack_mode = case options[:ack] + when :auto, nil + :auto + when :manual + :manual + when :transactional + :transactional + else + raise MessageDriver::Error, "unrecognized :ack option #{options[:ack]}" + end + start_subscription + end + + def unsubscribe + unless @bunny_consumer.nil? + @bunny_consumer.cancel + @bunny_consumer = nil + end + unless @sub_ctx.nil? + @sub_ctx.invalidate(true) + @sub_ctx = nil + end + end + + private + def start_subscription @sub_ctx.with_channel do |ch| queue = destination.bunny_queue(@sub_ctx.channel) if options.has_key? :prefetch_size ch.prefetch(options[:prefetch_size]) end - ack_mode = case options[:ack] - when :auto, nil - :auto - when :manual - :manual - when :transactional - :transactional - else - raise MessageDriver::Error, "unrecognized :ack option #{options[:ack]}" - end @bunny_consumer = queue.subscribe(options.merge(manual_ack: true)) do |delivery_info, properties, payload| - message = @sub_ctx.args_to_message(delivery_info, properties, payload) Client.with_adapter_context(@sub_ctx) do + message = @sub_ctx.args_to_message(delivery_info, properties, payload) + handle_message(message) + end + end + end + end + + def handle_message(message) + begin + case @ack_mode + when :auto + consumer.call(message) + @sub_ctx.ack_message(message) + when :manual + consumer.call(message) + when :transactional + Client.with_message_transaction do + consumer.call(message) + @sub_ctx.ack_message(message) + end + end + rescue => e + if [:auto, :transactional].include? @ack_mode + requeue = true + if e.is_a?(DontRequeue) || (options[:retry_redelivered] == false && message.redelivered?) + requeue = false + end + if @sub_ctx.valid? begin - case ack_mode - when :auto - consumer.call(message) - @sub_ctx.ack_message(message) - when :manual - consumer.call(message) - when :transactional - Client.with_message_transaction do - consumer.call(message) - @sub_ctx.ack_message(message) - end - end + @sub_ctx.nack_message(message, requeue: requeue) rescue => e - if [:auto, :transactional].include? ack_mode - requeue = true - if e.is_a?(DontRequeue) || (options[:retry_redelivered] == false && message.redelivered?) - requeue = false - end - if @sub_ctx.valid? - begin - @sub_ctx.nack_message(message, requeue: requeue) - rescue => e - logger.error exception_to_str(e) - end - end - end - @error_handler.call(e, message) unless @error_handler.nil? + logger.error exception_to_str(e) end end end + @error_handler.call(e, message) unless @error_handler.nil? end end - - def unsubscribe - unless @bunny_consumer.nil? - @bunny_consumer.cancel - @bunny_consumer = nil - end - unless @sub_ctx.nil? - @sub_ctx.invalidate(true) - @sub_ctx = nil - end - end end def initialize(config) validate_bunny_version @config = config - start_connection_thread + @handle_connection_errors = config.fetch(:handle_connection_errors, true) + initialize_connection end def connection(ensure_started=true) - start_connection_thread - if ensure_started && !@connection.open? + initialize_connection + if ensure_started begin @connection.start rescue *NETWORK_ERRORS => e raise MessageDriver::ConnectionError.new(e.to_s, e) end @@ -202,10 +213,14 @@ begin super @connection.close if !@connection.nil? && @connection.open? rescue *NETWORK_ERRORS => e logger.error "error while attempting connection close\n#{exception_to_str(e)}" + ensure + conn = @connection + @connection = nil + conn.cleanup_threads unless conn.nil? end end def build_context BunnyContext.new(self) @@ -338,14 +353,24 @@ end def invalidate(in_unsubscribe=false) super() unless @subscription.nil? || in_unsubscribe - @subscription.unsubscribe + begin + @subscription.unsubscribe if adapter.connection.open? + rescue => e + logger.debug "error trying to end subscription\n#{exception_to_str(e)}" + end end unless @channel.nil? - @channel.close if @channel.open? + begin + @channel.close if @channel.open? && adapter.connection.open? + rescue => e + logger.debug "error trying to close channel\n#{exception_to_str(e)}" + ensure + begin @channel.maybe_kill_consumer_work_pool! rescue nil; end + end end end def handle_errors begin @@ -387,40 +412,57 @@ private def reset_channel unless @channel.open? - @channel.open + @channel = adapter.connection.create_channel @is_transactional = false @rollback_only = true if in_transaction? end @need_channel_reset = false end end private - def start_connection_thread - @connection_thread ||= Thread.new do - begin - @connection = Bunny.new(@config) - sleep - rescue *NETWORK_ERRORS => e - logger.error "error on connection\n#{exception_to_str(e)}" - begin - while true - @connection.start + def log_errors + begin + yield + rescue => e + logger.error exception_to_str(e) + end + end + + def initialize_connection + if @handle_connection_errors + if @connection_thread.nil? + #hi mom! + @connection_thread = Thread.new do + @connection = Bunny.new(@config) + begin sleep + rescue *NETWORK_ERRORS => e + logger.error "error on connection\n#{exception_to_str(e)}" + if @connection.automatically_recover? + sleep 0.1 + unless @connection.recovering_from_network_failure? + stop + end + else + stop + end + retry + rescue => e + logger.error "unhandled error in connection thread! #{exception_to_str(e)}" end - rescue *NETWORK_ERRORS => e - logger.error "error trying to restart connection\n#{exception_to_str(e)}" - sleep 1 - retry end + @connection_thread.abort_on_exception = true + sleep 0.1 end - @connection_thread = nil + sleep 0.1 while @connection_thread.status != 'sleep' + else + @connection ||= Bunny.new(@config) end - sleep 0.1 while @connection_thread.status != 'sleep' end def validate_bunny_version required = Gem::Requirement.create('>= 0.10.8') current = Gem::Version.create(Bunny::VERSION)