lib/message_driver/adapters/bunny_adapter.rb in message-driver-0.6.1 vs lib/message_driver/adapters/bunny_adapter.rb in message-driver-0.7.0
- old
+ new
@@ -23,11 +23,11 @@
def initialize(ctx, delivery_info, properties, payload, destination)
raw_body = payload
raw_headers = properties[:headers]
raw_headers = {} if raw_headers.nil?
b, h, p = destination.middleware.on_consume(payload, raw_headers, properties)
- super(ctx, b, h, p, raw_body)
+ super(ctx, destination, b, h, p, raw_body)
@delivery_info = delivery_info
end
def delivery_tag
delivery_info.delivery_tag
@@ -57,35 +57,35 @@
class QueueDestination < Destination
def after_initialize(adapter_context)
if @dest_options[:no_declare]
if @name.empty?
- fail MessageDriver::Error, 'server-named queues must be declared, but you provided :no_declare => true'
+ raise MessageDriver::Error, 'server-named queues must be declared, but you provided :no_declare => true'
end
if @dest_options[:bindings]
- fail MessageDriver::Error, 'queues with bindings must be declared, but you provided :no_declare => true'
+ raise MessageDriver::Error, 'queues with bindings must be declared, but you provided :no_declare => true'
end
else
adapter_context.with_channel(false) do |ch|
bunny_queue(ch, init: true)
end
end
end
def bunny_queue(channel, options = {})
opts = @dest_options.dup
- opts.merge!(passive: options[:passive]) if options.key? :passive
+ opts[:passive] = options[:passive] if options.key? :passive
queue = channel.queue(@name, opts)
handle_queue_init(queue) if options.fetch(:init, false)
queue
end
def handle_queue_init(queue)
@name = queue.name
if (bindings = @dest_options[:bindings])
bindings.each do |bnd|
- fail MessageDriver::Error, "binding #{bnd.inspect} must provide a source!" unless bnd[:source]
+ raise MessageDriver::Error, "binding #{bnd.inspect} must provide a source!" unless bnd[:source]
queue.bind(bnd[:source], bnd[:args] || {})
end
end
end
@@ -95,46 +95,46 @@
def routing_key(_properties)
@name
end
- def message_count
- adapter.broker.client.current_adapter_context.with_channel(false) do |ch|
+ def handle_message_count
+ current_adapter_context.with_channel(false) do |ch|
bunny_queue(ch, passive: true).message_count
end
end
def subscribe(options = {}, &consumer)
- adapter.broker.client.current_adapter_context.subscribe(self, options, &consumer)
+ current_adapter_context.subscribe(self, options, &consumer)
end
- def consumer_count
- adapter.broker.client.current_adapter_context.with_channel(false) do |ch|
+ def handle_consumer_count
+ current_adapter_context.with_channel(false) do |ch|
bunny_queue(ch, passive: true).consumer_count
end
end
def purge
- adapter.broker.client.current_adapter_context.with_channel(false) do |ch|
+ current_adapter_context.with_channel(false) do |ch|
bunny_queue(ch).purge
end
end
end
class ExchangeDestination < Destination
def after_initialize(adapter_context)
if (declare = @dest_options[:declare])
adapter_context.with_channel(false) do |ch|
type = declare.delete(:type)
- fail MessageDriver::Error, 'you must provide a valid exchange type' unless type
+ raise MessageDriver::Error, 'you must provide a valid exchange type' unless type
ch.exchange_declare(@name, type, declare)
end
end
if (bindings = @dest_options[:bindings])
adapter_context.with_channel(false) do |ch|
bindings.each do |bnd|
- fail MessageDriver::Error, "binding #{bnd.inspect} must provide a source!" unless bnd[:source]
+ raise MessageDriver::Error, "binding #{bnd.inspect} must provide a source!" unless bnd[:source]
ch.exchange_bind(bnd[:source], @name, bnd[:args] || {})
end
end
end
end
@@ -143,12 +143,12 @@
class Subscription < Subscription::Base
attr_reader :sub_ctx, :error_handler
def start
unless destination.is_a? QueueDestination
- fail MessageDriver::Error,
- 'subscriptions are only supported with QueueDestinations'
+ raise MessageDriver::Error,
+ 'subscriptions are only supported with QueueDestinations'
end
@sub_ctx = adapter.new_subscription_context(self)
@error_handler = options[:error_handler]
@message_handler = case options.delete(:ack)
when :auto, nil
@@ -156,11 +156,11 @@
when :manual
ManualAckHandler.new(self)
when :transactional
TransactionalAckHandler.new(self)
else
- fail MessageDriver::Error, "unrecognized :ack option #{options[:ack]}"
+ raise MessageDriver::Error, "unrecognized :ack option #{options[:ack]}"
end
start_subscription
end
def unsubscribe
@@ -303,54 +303,50 @@
@need_channel_reset = false
@in_transaction = false
@require_commit = false
end
- def create_destination(name, dest_options = {}, message_props = {})
+ def handle_create_destination(name, dest_options = {}, message_props = {})
dest = case type = dest_options.delete(:type)
when :exchange
ExchangeDestination.new(adapter, name, dest_options, message_props)
when :queue, nil
QueueDestination.new(adapter, name, dest_options, message_props)
else
- fail MessageDriver::Error, "invalid destination type #{type}"
+ raise MessageDriver::Error, "invalid destination type #{type}"
end
dest.after_initialize(self)
dest
end
def supports_transactions?
true
end
- def begin_transaction(options = {})
+ def handle_begin_transaction(options = {})
if in_transaction?
- fail MessageDriver::TransactionError,
- "you can't begin another transaction, you are already in one!"
+ raise MessageDriver::TransactionError,
+ "you can't begin another transaction, you are already in one!"
end
@in_transaction = true
@in_confirms_transaction = true if options[:type] == :confirm_and_wait
end
- def commit_transaction
+ def handle_commit_transaction(_ = nil)
if !in_transaction? && !@require_commit
- fail MessageDriver::TransactionError,
- "you can't finish the transaction unless you already in one!"
+ raise MessageDriver::TransactionError,
+ "you can't finish the transaction unless you already in one!"
end
begin
if @in_confirms_transaction
- unless @rollback_only || @channel.nil?
- @channel.wait_for_confirms
- end
- else
- if is_transactional? && valid? && !@need_channel_reset && @require_commit
- handle_errors do
- if @rollback_only
- @channel.tx_rollback
- else
- @channel.tx_commit
- end
+ @channel.wait_for_confirms unless @rollback_only || @channel.nil?
+ elsif is_transactional? && valid? && !@need_channel_reset && @require_commit
+ handle_errors do
+ if @rollback_only
+ @channel.tx_rollback
+ else
+ @channel.tx_commit
end
end
end
ensure
@rollback_only = false
@@ -358,25 +354,25 @@
@in_confirms_transaction = false
@require_commit = false
end
end
- def rollback_transaction
+ def handle_rollback_transaction(_ = nil)
@rollback_only = true
commit_transaction
end
def transactional?
@is_transactional
end
- alias_method :is_transactional?, :transactional?
+ alias is_transactional? transactional?
def in_transaction?
@in_transaction
end
- def publish(destination, body, headers = {}, properties = {})
+ def handle_publish(destination, body, headers = {}, properties = {})
body, exchange, routing_key, props = *destination.publish_params(body, headers, properties)
confirm = props.delete(:confirm)
confirm = false if confirm.nil?
with_channel(true) do |ch|
if confirm == true
@@ -385,12 +381,12 @@
ch.basic_publish(body, exchange, routing_key, props)
ch.wait_for_confirms if confirm == true
end
end
- def pop_message(destination, options = {})
- fail MessageDriver::Error, "You can't pop a message off an exchange" if destination.is_a? ExchangeDestination
+ def handle_pop_message(destination, options = {})
+ raise MessageDriver::Error, "You can't pop a message off an exchange" if destination.is_a? ExchangeDestination
with_channel(false) do |ch|
queue = ch.queue(destination.name, passive: true)
message = queue.pop(adapter.ack_key => options.fetch(:client_ack, false))
@@ -404,33 +400,49 @@
def supports_client_acks?
true
end
- def ack_message(message, _options = {})
+ def handle_ack_message(message, _options = {})
with_channel(true) do |ch|
ch.ack(message.delivery_tag)
end
end
- def nack_message(message, options = {})
+ def handle_nack_message(message, options = {})
requeue = options.fetch(:requeue, true)
with_channel(true) do |ch|
ch.reject(message.delivery_tag, requeue)
end
end
def supports_subscriptions?
true
end
- def subscribe(destination, options = {}, &consumer)
+ def handle_subscribe(destination, options = {}, &consumer)
sub = Subscription.new(adapter, destination, consumer, options)
sub.start
sub
end
+ def handle_message_count(destination)
+ if destination.respond_to?(:handle_message_count)
+ destination.handle_message_count
+ else
+ super
+ end
+ end
+
+ def handle_consumer_count(destination)
+ if destination.respond_to?(:handle_consumer_count)
+ destination.handle_consumer_count
+ else
+ super
+ end
+ end
+
def invalidate(in_unsubscribe = false)
super()
unless @subscription.nil? || in_unsubscribe
begin
@subscription.unsubscribe if adapter.connection.open?
@@ -486,12 +498,12 @@
@is_transactional = true
end
end
def with_channel(require_commit = true)
- fail MessageDriver::TransactionRollbackOnly if @rollback_only
- fail MessageDriver::Error, 'this adapter context is not valid!' unless valid?
+ raise MessageDriver::TransactionRollbackOnly if @rollback_only
+ raise MessageDriver::Error, 'this adapter context is not valid!' unless valid?
ensure_channel
@require_commit ||= require_commit
if in_transaction?
if @in_confirms_transaction
@channel.confirm_select unless @channel.using_publisher_confirmations?
@@ -533,10 +545,10 @@
def validate_bunny_version
required = Gem::Requirement.create('>= 1.7.0')
current = Gem::Version.create(Bunny::VERSION)
unless required.satisfied_by? current
- fail MessageDriver::Error, 'bunny 1.7.0 or later is required for the bunny adapter'
+ raise MessageDriver::Error, 'bunny 1.7.0 or later is required for the bunny adapter'
end
end
end
end
end