lib/message_driver/adapters/bunny_adapter.rb in message-driver-0.2.2 vs lib/message_driver/adapters/bunny_adapter.rb in message-driver-0.3.0

- old
+ new

@@ -1,7 +1,6 @@ require 'bunny' -require 'bunny/session_patch' module MessageDriver class Broker def bunny_adapter MessageDriver::Adapters::BunnyAdapter @@ -13,12 +12,12 @@ NETWORK_ERRORS = [Bunny::TCPConnectionFailed, Bunny::ConnectionClosedError, Bunny::ConnectionLevelException, Bunny::NetworkErrorWrapper, Bunny::NetworkFailure, IOError].freeze class Message < MessageDriver::Message::Base attr_reader :delivery_info - def initialize(delivery_info, properties, payload) - super(payload, properties[:headers]||{}, properties) + def initialize(ctx, delivery_info, properties, payload) + super(ctx, payload, properties[:headers]||{}, properties) @delivery_info = delivery_info end def delivery_tag delivery_info.delivery_tag @@ -78,17 +77,17 @@ def routing_key(properties) @name end def message_count - Client.current_adapter_context.with_channel(false) do |ch| + adapter.broker.client.current_adapter_context.with_channel(false) do |ch| ch.queue(@name, @dest_options.merge(passive: true)).message_count end end def purge - Client.current_adapter_context.with_channel(false) do |ch| + adapter.broker.client.current_adapter_context.with_channel(false) do |ch| bunny_queue(ch).purge end end end @@ -147,11 +146,11 @@ queue = destination.bunny_queue(@sub_ctx.channel) if options.has_key? :prefetch_size ch.prefetch(options[:prefetch_size]) end @bunny_consumer = queue.subscribe(options.merge(manual_ack: true)) do |delivery_info, properties, payload| - Client.with_adapter_context(@sub_ctx) do + adapter.broker.client.with_adapter_context(@sub_ctx) do message = @sub_ctx.args_to_message(delivery_info, properties, payload) handle_message(message) end end end @@ -164,11 +163,11 @@ consumer.call(message) @sub_ctx.ack_message(message) when :manual consumer.call(message) when :transactional - Client.with_message_transaction do + adapter.broker.client.with_message_transaction do consumer.call(message) @sub_ctx.ack_message(message) end end rescue => e @@ -188,39 +187,39 @@ @error_handler.call(e, message) unless @error_handler.nil? end end end - def initialize(config) + def initialize(broker, config) validate_bunny_version + @broker = broker @config = config - @handle_connection_errors = config.fetch(:handle_connection_errors, true) - initialize_connection end def connection(ensure_started=true) - initialize_connection if ensure_started begin + @connection ||= Bunny::Session.new(@config) @connection.start rescue *NETWORK_ERRORS => e raise MessageDriver::ConnectionError.new(e.to_s, e) + rescue => e + stop + raise e end end @connection end def stop begin super - @connection.close if !@connection.nil? && @connection.open? - rescue *NETWORK_ERRORS => e + @connection.close if !@connection.nil? + rescue => e logger.error "error while attempting connection close\n#{exception_to_str(e)}" ensure - conn = @connection @connection = nil - conn.cleanup_threads unless conn.nil? end end def build_context BunnyContext.new(self) @@ -261,16 +260,10 @@ true end def begin_transaction(options={}) raise MessageDriver::TransactionError, "you can't begin another transaction, you are already in one!" if in_transaction? - unless is_transactional? - with_channel(false) do |ch| - ch.tx_select - end - @is_transactional = true - end @in_transaction = true end def commit_transaction(channel_commit=false) raise MessageDriver::TransactionError, "you can't finish the transaction unless you already in one!" if !in_transaction? && !channel_commit @@ -394,22 +387,26 @@ end end def with_channel(require_commit=true) raise MessageDriver::TransactionRollbackOnly if @rollback_only - raise MessageDriver::Error, "oh nos!" if !valid? + raise MessageDriver::Error, "this adapter context is not valid!" if !valid? @channel = adapter.connection.create_channel if @channel.nil? reset_channel if @need_channel_reset + if in_transaction? && !is_transactional? + @channel.tx_select + @is_transactional = true + end handle_errors do result = yield @channel commit_transaction(true) if require_commit && is_transactional? && !in_transaction? result end end def args_to_message(delivery_info, properties, payload) - Message.new(delivery_info, properties, payload) + Message.new(self, delivery_info, properties, payload) end private def reset_channel @@ -430,46 +427,14 @@ rescue => e logger.error exception_to_str(e) end end - def initialize_connection - if @handle_connection_errors - if @connection_thread.nil? - #hi mom! - @connection_thread = Thread.new do - @connection = Bunny.new(@config) - begin - sleep - rescue *NETWORK_ERRORS => e - logger.error "error on connection\n#{exception_to_str(e)}" - if @connection.automatically_recover? - sleep 0.1 - unless @connection.recovering_from_network_failure? - stop - end - else - stop - end - retry - rescue => e - logger.error "unhandled error in connection thread! #{exception_to_str(e)}" - end - end - @connection_thread.abort_on_exception = true - sleep 0.1 - end - sleep 0.1 while @connection_thread.status != 'sleep' - else - @connection ||= Bunny.new(@config) - end - end - def validate_bunny_version - required = Gem::Requirement.create('>= 0.10.8') + required = Gem::Requirement.create('>= 1.1.3') current = Gem::Version.create(Bunny::VERSION) unless required.satisfied_by? current - raise MessageDriver::Error, "bunny 0.10.8 or later is required for the bunny adapter" + raise MessageDriver::Error, "bunny 1.1.3 or later is required for the bunny adapter" end end end end end