lib/message_driver/adapters/bunny_adapter.rb in message-driver-0.3.0 vs lib/message_driver/adapters/bunny_adapter.rb in message-driver-0.4.0

- old
+ new

@@ -44,39 +44,39 @@ end end class QueueDestination < Destination def after_initialize(adapter_context) - unless @dest_options[:no_declare] + if @dest_options[:no_declare] + raise MessageDriver::Error, 'server-named queues must be declared, but you provided :no_declare => true' if @name.empty? + raise MessageDriver::Error, 'queues with bindings must be declared, but you provided :no_declare => true' if @dest_options[:bindings] + else adapter_context.with_channel(false) do |ch| bunny_queue(ch, true) end - else - raise MessageDriver::Error, "server-named queues must be declared, but you provided :no_declare => true" if @name.empty? - raise MessageDriver::Error, "queues with bindings must be declared, but you provided :no_declare => true" if @dest_options[:bindings] end end def bunny_queue(channel, initialize=false) queue = channel.queue(@name, @dest_options) if initialize @name = queue.name - if bindings = @dest_options[:bindings] + if (bindings = @dest_options[:bindings]) bindings.each do |bnd| raise MessageDriver::Error, "binding #{bnd.inspect} must provide a source!" unless bnd[:source] queue.bind(bnd[:source], bnd[:args]||{}) end end end queue end def exchange_name - "" + '' end - def routing_key(properties) + def routing_key(_properties) @name end def message_count adapter.broker.client.current_adapter_context.with_channel(false) do |ch| @@ -91,18 +91,18 @@ end end class ExchangeDestination < Destination def after_initialize(adapter_context) - if declare = @dest_options[:declare] + if (declare = @dest_options[:declare]) adapter_context.with_channel(false) do |ch| type = declare.delete(:type) - raise MessageDriver::Error, "you must provide a valid exchange type" unless type + raise MessageDriver::Error, 'you must provide a valid exchange type' unless type ch.exchange_declare(@name, type, declare) end end - if bindings = @dest_options[:bindings] + if (bindings = @dest_options[:bindings]) adapter_context.with_channel(false) do |ch| bindings.each do |bnd| raise MessageDriver::Error, "binding #{bnd.inspect} must provide a source!" unless bnd[:source] ch.exchange_bind(bnd[:source], @name, bnd[:args]||{}) end @@ -111,11 +111,11 @@ end end class Subscription < Subscription::Base def start - raise MessageDriver::Error, "subscriptions are only supported with QueueDestinations" unless destination.is_a? QueueDestination + raise MessageDriver::Error, 'subscriptions are only supported with QueueDestinations' unless destination.is_a? QueueDestination @sub_ctx = adapter.new_subscription_context(self) @error_handler = options[:error_handler] @ack_mode = case options[:ack] when :auto, nil :auto @@ -139,14 +139,15 @@ @sub_ctx = nil end end private + def start_subscription @sub_ctx.with_channel do |ch| queue = destination.bunny_queue(@sub_ctx.channel) - if options.has_key? :prefetch_size + if options.key? :prefetch_size ch.prefetch(options[:prefetch_size]) end @bunny_consumer = queue.subscribe(options.merge(manual_ack: true)) do |delivery_info, properties, payload| adapter.broker.client.with_adapter_context(@sub_ctx) do message = @sub_ctx.args_to_message(delivery_info, properties, payload) @@ -211,11 +212,11 @@ end def stop begin super - @connection.close if !@connection.nil? + @connection.close unless @connection.nil? rescue => e logger.error "error while attempting connection close\n#{exception_to_str(e)}" ensure @connection = nil end @@ -261,57 +262,77 @@ end def begin_transaction(options={}) raise MessageDriver::TransactionError, "you can't begin another transaction, you are already in one!" if in_transaction? @in_transaction = true + @in_confirms_transaction = true if options[:type] == :confirm_and_wait end def commit_transaction(channel_commit=false) raise MessageDriver::TransactionError, "you can't finish the transaction unless you already in one!" if !in_transaction? && !channel_commit begin - if is_transactional? && valid? && !@need_channel_reset - handle_errors do - if @rollback_only - @channel.tx_rollback - else - @channel.tx_commit + if @in_confirms_transaction + wait_for_confirms(@channel) unless @rollback_only + else + if is_transactional? && valid? && !@need_channel_reset + handle_errors do + if @rollback_only + @channel.tx_rollback + else + @channel.tx_commit + end end end end ensure @rollback_only = false @in_transaction = false + @in_confirms_transaction = false end end + def wait_for_confirms(channel) + until channel.unconfirmed_set.empty? + channel.wait_for_confirms + end + end + private :wait_for_confirms + def rollback_transaction @rollback_only = true commit_transaction end - def is_transactional? + def transactional? @is_transactional end + alias_method :is_transactional?, :transactional? def in_transaction? @in_transaction end def publish(destination, body, headers={}, properties={}) exchange, routing_key, props = *destination.publish_params(headers, properties) + confirm = props.delete(:confirm) + confirm = false if confirm.nil? with_channel(true) do |ch| + if confirm == true + ch.confirm_select unless ch.using_publisher_confirms? + end ch.basic_publish(body, exchange, routing_key, props) + ch.wait_for_confirms if confirm == true end end def pop_message(destination, options={}) raise MessageDriver::Error, "You can't pop a message off an exchange" if destination.is_a? ExchangeDestination with_channel(false) do |ch| queue = ch.queue(destination.name, passive: true) - message = queue.pop(ack: !!options[:client_ack]) + message = queue.pop(ack: options.fetch(:client_ack, false)) if message.nil? || message[0].nil? nil else args_to_message(*message) end @@ -320,18 +341,18 @@ def supports_client_acks? true end - def ack_message(message, options={}) + def ack_message(message, _options={}) with_channel(true) do |ch| ch.ack(message.delivery_tag) end end def nack_message(message, options={}) - requeue = options[:requeue].kind_of?(FalseClass) ? false : true + requeue = options[:requeue].is_a?(FalseClass) ? false : true with_channel(true) do |ch| ch.reject(message.delivery_tag, requeue) end end @@ -369,11 +390,11 @@ begin yield rescue Bunny::ChannelLevelException => e @need_channel_reset = true @rollback_only = true if in_transaction? - if e.kind_of? Bunny::NotFound + if e.is_a? Bunny::NotFound raise MessageDriver::QueueNotFound.new(e.to_s, e) else raise MessageDriver::WrappedError.new(e.to_s, e) end rescue Bunny::ChannelAlreadyClosed => e @@ -387,16 +408,22 @@ end end def with_channel(require_commit=true) raise MessageDriver::TransactionRollbackOnly if @rollback_only - raise MessageDriver::Error, "this adapter context is not valid!" if !valid? + raise MessageDriver::Error, 'this adapter context is not valid!' unless valid? @channel = adapter.connection.create_channel if @channel.nil? reset_channel if @need_channel_reset - if in_transaction? && !is_transactional? - @channel.tx_select - @is_transactional = true + if in_transaction? + if @in_confirms_transaction + @channel.confirm_select unless @channel.using_publisher_confirmations? + else + unless is_transactional? + @channel.tx_select + @is_transactional = true + end + end end handle_errors do result = yield @channel commit_transaction(true) if require_commit && is_transactional? && !in_transaction? result @@ -428,13 +455,13 @@ logger.error exception_to_str(e) end end def validate_bunny_version - required = Gem::Requirement.create('>= 1.1.3') + required = Gem::Requirement.create('>= 1.2.2') current = Gem::Version.create(Bunny::VERSION) unless required.satisfied_by? current - raise MessageDriver::Error, "bunny 1.1.3 or later is required for the bunny adapter" + raise MessageDriver::Error, 'bunny 1.2.2 or later is required for the bunny adapter' end end end end end