lib/message_driver/adapters/bunny_adapter.rb in message-driver-0.1.0 vs lib/message_driver/adapters/bunny_adapter.rb in message-driver-0.2.0.rc1

- old
+ new

@@ -7,25 +7,34 @@ end end module Adapters class BunnyAdapter < Base + NETWORK_ERRORS = [Bunny::TCPConnectionFailed, Bunny::ConnectionClosedError, Bunny::ConnectionLevelException, Bunny::NetworkErrorWrapper, Bunny::NetworkFailure, IOError].freeze class Message < MessageDriver::Message::Base attr_reader :delivery_info def initialize(delivery_info, properties, payload) super(payload, properties[:headers]||{}, properties) @delivery_info = delivery_info end + + def delivery_tag + delivery_info.delivery_tag + end + + def redelivered? + delivery_info.redelivered? + end end class Destination < MessageDriver::Destination::Base - def publish(body, headers={}, properties={}) + def publish_params(headers, properties) props = @message_props.merge(properties) props[:headers] = headers if headers - @adapter.publish(body, exchange_name, routing_key(properties), props) + [exchange_name, routing_key(properties), props] end def exchange_name @name end @@ -34,236 +43,351 @@ properties[:routing_key] end end class QueueDestination < Destination - def after_initialize + def after_initialize(adapter_context) unless @dest_options[:no_declare] - @adapter.current_context.with_channel(false) do |ch| - queue = ch.queue(@name, @dest_options) - @name = queue.name - if bindings = @dest_options[:bindings] - bindings.each do |bnd| - raise MessageDriver::Exception, "binding #{bnd.inspect} must provide a source!" unless bnd[:source] - queue.bind(bnd[:source], bnd[:args]||{}) - end - end + adapter_context.with_channel(false) do |ch| + bunny_queue(ch, true) end else - raise MessageDriver::Exception, "server-named queues must be declared, but you provided :no_declare => true" if @name.empty? - raise MessageDriver::Exception, "queues with bindings must be declared, but you provided :no_declare => true" if @dest_options[:bindings] + raise MessageDriver::Error, "server-named queues must be declared, but you provided :no_declare => true" if @name.empty? + raise MessageDriver::Error, "queues with bindings must be declared, but you provided :no_declare => true" if @dest_options[:bindings] end end + def bunny_queue(channel, initialize=false) + queue = channel.queue(@name, @dest_options) + if initialize + @name = queue.name + if bindings = @dest_options[:bindings] + bindings.each do |bnd| + raise MessageDriver::Error, "binding #{bnd.inspect} must provide a source!" unless bnd[:source] + queue.bind(bnd[:source], bnd[:args]||{}) + end + end + end + queue + end + def exchange_name "" end def routing_key(properties) @name end def message_count - @adapter.current_context.with_channel(false) do |ch| + Client.current_adapter_context.with_channel(false) do |ch| ch.queue(@name, @dest_options.merge(passive: true)).message_count end end - end - class ExchangeDestination < Destination - def pop_message(destination, options={}) - raise MessageDriver::Exception, "You can't pop a message off an exchange" + def purge + Client.current_adapter_context.with_channel(false) do |ch| + bunny_queue(ch).purge + end end + end - def after_initialize + class ExchangeDestination < Destination + def after_initialize(adapter_context) if declare = @dest_options[:declare] - @adapter.current_context.with_channel(false) do |ch| + adapter_context.with_channel(false) do |ch| type = declare.delete(:type) - raise MessageDriver::Exception, "you must provide a valid exchange type" unless type + raise MessageDriver::Error, "you must provide a valid exchange type" unless type ch.exchange_declare(@name, type, declare) end end if bindings = @dest_options[:bindings] - @adapter.current_context.with_channel(false) do |ch| + adapter_context.with_channel(false) do |ch| bindings.each do |bnd| - raise MessageDriver::Exception, "binding #{bnd.inspect} must provide a source!" unless bnd[:source] + raise MessageDriver::Error, "binding #{bnd.inspect} must provide a source!" unless bnd[:source] ch.exchange_bind(bnd[:source], @name, bnd[:args]||{}) end end end end end + class Subscription < Subscription::Base + def start + raise MessageDriver::Error, "subscriptions are only supported with QueueDestinations" unless destination.is_a? QueueDestination + @sub_ctx = adapter.new_subscription_context(self) + @error_handler = options[:error_handler] + @sub_ctx.with_channel do |ch| + queue = destination.bunny_queue(@sub_ctx.channel) + ack_mode = case options[:ack] + when :auto, nil + :auto + when :manual + :manual + when :transactional + :transactional + else + raise MessageDriver::Error, "unrecognized :ack option #{options[:ack]}" + end + @bunny_consumer = queue.subscribe(options.merge(manual_ack: true)) do |delivery_info, properties, payload| + message = @sub_ctx.args_to_message(delivery_info, properties, payload) + Client.with_adapter_context(@sub_ctx) do + begin + case ack_mode + when :auto + consumer.call(message) + @sub_ctx.ack_message(message) + when :manual + consumer.call(message) + when :transactional + Client.with_message_transaction do + consumer.call(message) + @sub_ctx.ack_message(message) + end + end + rescue => e + if e.is_a?(DontRequeue) || (options[:retry_redelivered] == false && message.redelivered?) + if [:auto, :transactional].include? ack_mode + @sub_ctx.nack_message(message, requeue: false) + end + else + if @sub_ctx.valid? && ack_mode == :auto + begin + @sub_ctx.nack_message(message, requeue: true) + rescue => e + logger.error exception_to_str(e) + end + end + @error_handler.call(e, message) unless @error_handler.nil? + end + end + end + end + end + end + + def unsubscribe + unless @bunny_consumer.nil? + @bunny_consumer.cancel + @bunny_consumer = nil + end + unless @sub_ctx.nil? + @sub_ctx.invalidate(true) + @sub_ctx = nil + end + end + end + def initialize(config) validate_bunny_version - - @connection = Bunny.new(config.merge(threaded: false)) + @config = config end def connection(ensure_started=true) + @connection ||= Bunny.new(@config) if ensure_started && !@connection.open? - @connection.start + begin + @connection.start + rescue *NETWORK_ERRORS => e + raise MessageDriver::ConnectionError.new(e.to_s, e) + end end @connection end - def publish(body, exchange, routing_key, properties) - current_context.with_channel(true) do |ch| - ch.basic_publish(body, exchange, routing_key, properties) + def stop + begin + super + @connection.close if !@connection.nil? && @connection.open? + rescue *NETWORK_ERRORS => e + logger.error "error while attempting connection close\n#{exception_to_str(e)}" end end - def pop_message(destination, options={}) - current_context.with_channel do |ch| - queue = ch.queue(destination, passive: true) + def build_context + BunnyContext.new(self) + end - message = queue.pop - if message.nil? || message[0].nil? - nil + def new_subscription_context(subscription) + ctx = new_context + ctx.channel = connection.create_channel + ctx.subscription = subscription + ctx + end + + class BunnyContext < ContextBase + attr_accessor :channel, :subscription + + def initialize(adapter) + super(adapter) + @is_transactional = false + @rollback_only = false + @need_channel_reset = false + @in_transaction = false + end + + def create_destination(name, dest_options={}, message_props={}) + dest = case type = dest_options.delete(:type) + when :exchange + ExchangeDestination.new(self.adapter, name, dest_options, message_props) + when :queue, nil + QueueDestination.new(self.adapter, name, dest_options, message_props) else - Message.new(*message) + raise MessageDriver::Error, "invalid destination type #{type}" end + dest.after_initialize(self) + dest end - end - def create_destination(name, dest_options={}, message_props={}) - case type = dest_options.delete(:type) - when :exchange - ExchangeDestination.new(self, name, dest_options, message_props) - when :queue, nil - QueueDestination.new(self, name, dest_options, message_props) - else - raise MessageDriver::Exception, "invalid destination type #{type}" + def supports_transactions? + true end - end - def with_transaction(options={}, &block) - current_context.with_transaction(&block) - end + def begin_transaction(options={}) + raise MessageDriver::TransactionError, "you can't begin another transaction, you are already in one!" if in_transaction? + unless is_transactional? + with_channel(false) do |ch| + ch.tx_select + end + @is_transactional = true + end + @in_transaction = true + end - def stop - @connection.close if @connection.open? - @context = nil - end + def commit_transaction(channel_commit=false) + raise MessageDriver::TransactionError, "you can't finish the transaction unless you already in one!" if !in_transaction? && !channel_commit + begin + if is_transactional? && valid? && !@need_channel_reset + if @rollback_only + @channel.tx_rollback + else + @channel.tx_commit + end + end + ensure + @rollback_only = false + @in_transaction = false + end + end - def current_context - if !@context.nil? && @context.need_new_context? - @context = nil + def rollback_transaction + @rollback_only = true + commit_transaction end - @context ||= ChannelContext.new(connection) - end - private + def is_transactional? + @is_transactional + end - class ChannelContext - attr_reader :connection, :transaction_depth + def in_transaction? + @in_transaction + end - def initialize(connection) - @connection = connection - @channel = connection.create_channel - @transaction_depth = 0 - @is_transactional = false - @rollback_only = false - @need_channel_reset = false - @connection_failed = false + def publish(destination, body, headers={}, properties={}) + exchange, routing_key, props = *destination.publish_params(headers, properties) + with_channel(true) do |ch| + ch.basic_publish(body, exchange, routing_key, props) + end end - def is_transactional? - @is_transactional + def pop_message(destination, options={}) + raise MessageDriver::Error, "You can't pop a message off an exchange" if destination.is_a? ExchangeDestination + + with_channel(false) do |ch| + queue = ch.queue(destination.name, passive: true) + + message = queue.pop(ack: !!options[:client_ack]) + if message.nil? || message[0].nil? + nil + else + args_to_message(*message) + end + end end - def connection_failed? - @connection_failed + def supports_client_acks? + true end - def with_transaction(&block) - if !is_transactional? - @channel.tx_select - @is_transactional = true + def ack_message(message, options={}) + with_channel(true) do |ch| + ch.ack(message.delivery_tag) end + end - begin - @transaction_depth += 1 - yield - commit_transaction - rescue - rollback_transaction - raise - ensure - @transaction_depth -= 1 + def nack_message(message, options={}) + requeue = options[:requeue].kind_of?(FalseClass) ? false : true + with_channel(true) do |ch| + ch.reject(message.delivery_tag, requeue) end end + def supports_subscriptions? + true + end + + def subscribe(destination, options={}, &consumer) + sub = Subscription.new(adapter, destination, consumer, options) + sub.start + sub + end + + def invalidate(in_unsubscribe=false) + super() + unless @subscription.nil? || in_unsubscribe + @subscription.unsubscribe + end + unless @channel.nil? + @channel.close if @channel.open? + end + end + def with_channel(require_commit=true) raise MessageDriver::TransactionRollbackOnly if @rollback_only - raise MessageDriver::Exception, "oh shit!" if @connection_failed + raise MessageDriver::Error, "oh nos!" if !valid? + @channel = adapter.connection.create_channel if @channel.nil? reset_channel if @need_channel_reset begin result = yield @channel - commit_transaction(true) if require_commit + commit_transaction(true) if require_commit && is_transactional? && !in_transaction? result rescue Bunny::ChannelLevelException => e @need_channel_reset = true - @rollback_only = true if is_transactional? + @rollback_only = true if in_transaction? if e.kind_of? Bunny::NotFound - raise MessageDriver::QueueNotFound.new(e) + raise MessageDriver::QueueNotFound.new(e.to_s, e) else - raise MessageDriver::WrappedException.new(e) + raise MessageDriver::WrappedError.new(e.to_s, e) end - rescue Bunny::NetworkErrorWrapper, IOError => e - @connection_failed = true - @rollback_only = true if is_transactional? - raise MessageDriver::ConnectionException.new(e) + rescue *NETWORK_ERRORS => e + @rollback_only = true if in_transaction? + raise MessageDriver::ConnectionError.new(e.to_s, e) end end - def within_transaction? - @transaction_depth > 0 + def args_to_message(delivery_info, properties, payload) + Message.new(delivery_info, properties, payload) end - def need_new_context? - if is_transactional? - !within_transaction? && connection_failed? - else - connection_failed? - end - end - private def reset_channel unless @channel.open? @channel.open @is_transactional = false + @rollback_only = true if in_transaction? end @need_channel_reset = false end - - def commit_transaction(from_channel=false) - threshold = from_channel ? 0 : 1 - if is_transactional? && @transaction_depth <= threshold && !connection_failed? - unless @need_channel_reset - unless @rollback_only - @channel.tx_commit - else - @channel.tx_rollback - end - end - @rollback_only = false - end - end - - def rollback_transaction - @rollback_only = true - commit_transaction - end end + private + def validate_bunny_version - required = Gem::Requirement.create('~> 0.9.0.pre7') + required = Gem::Requirement.create('>= 0.9.3') current = Gem::Version.create(Bunny::VERSION) unless required.satisfied_by? current - raise MessageDriver::Exception, "bunny 0.9.0.pre7 or later is required for the bunny adapter" + raise MessageDriver::Error, "bunny 0.9.3 or later is required for the bunny adapter" end end end end end