lib/message_driver/adapters/bunny_adapter.rb in message-driver-0.3.0 vs lib/message_driver/adapters/bunny_adapter.rb in message-driver-0.4.0
- old
+ new
@@ -44,39 +44,39 @@
end
end
class QueueDestination < Destination
def after_initialize(adapter_context)
- unless @dest_options[:no_declare]
+ if @dest_options[:no_declare]
+ raise MessageDriver::Error, 'server-named queues must be declared, but you provided :no_declare => true' if @name.empty?
+ raise MessageDriver::Error, 'queues with bindings must be declared, but you provided :no_declare => true' if @dest_options[:bindings]
+ else
adapter_context.with_channel(false) do |ch|
bunny_queue(ch, true)
end
- else
- raise MessageDriver::Error, "server-named queues must be declared, but you provided :no_declare => true" if @name.empty?
- raise MessageDriver::Error, "queues with bindings must be declared, but you provided :no_declare => true" if @dest_options[:bindings]
end
end
def bunny_queue(channel, initialize=false)
queue = channel.queue(@name, @dest_options)
if initialize
@name = queue.name
- if bindings = @dest_options[:bindings]
+ if (bindings = @dest_options[:bindings])
bindings.each do |bnd|
raise MessageDriver::Error, "binding #{bnd.inspect} must provide a source!" unless bnd[:source]
queue.bind(bnd[:source], bnd[:args]||{})
end
end
end
queue
end
def exchange_name
- ""
+ ''
end
- def routing_key(properties)
+ def routing_key(_properties)
@name
end
def message_count
adapter.broker.client.current_adapter_context.with_channel(false) do |ch|
@@ -91,18 +91,18 @@
end
end
class ExchangeDestination < Destination
def after_initialize(adapter_context)
- if declare = @dest_options[:declare]
+ if (declare = @dest_options[:declare])
adapter_context.with_channel(false) do |ch|
type = declare.delete(:type)
- raise MessageDriver::Error, "you must provide a valid exchange type" unless type
+ raise MessageDriver::Error, 'you must provide a valid exchange type' unless type
ch.exchange_declare(@name, type, declare)
end
end
- if bindings = @dest_options[:bindings]
+ if (bindings = @dest_options[:bindings])
adapter_context.with_channel(false) do |ch|
bindings.each do |bnd|
raise MessageDriver::Error, "binding #{bnd.inspect} must provide a source!" unless bnd[:source]
ch.exchange_bind(bnd[:source], @name, bnd[:args]||{})
end
@@ -111,11 +111,11 @@
end
end
class Subscription < Subscription::Base
def start
- raise MessageDriver::Error, "subscriptions are only supported with QueueDestinations" unless destination.is_a? QueueDestination
+ raise MessageDriver::Error, 'subscriptions are only supported with QueueDestinations' unless destination.is_a? QueueDestination
@sub_ctx = adapter.new_subscription_context(self)
@error_handler = options[:error_handler]
@ack_mode = case options[:ack]
when :auto, nil
:auto
@@ -139,14 +139,15 @@
@sub_ctx = nil
end
end
private
+
def start_subscription
@sub_ctx.with_channel do |ch|
queue = destination.bunny_queue(@sub_ctx.channel)
- if options.has_key? :prefetch_size
+ if options.key? :prefetch_size
ch.prefetch(options[:prefetch_size])
end
@bunny_consumer = queue.subscribe(options.merge(manual_ack: true)) do |delivery_info, properties, payload|
adapter.broker.client.with_adapter_context(@sub_ctx) do
message = @sub_ctx.args_to_message(delivery_info, properties, payload)
@@ -211,11 +212,11 @@
end
def stop
begin
super
- @connection.close if !@connection.nil?
+ @connection.close unless @connection.nil?
rescue => e
logger.error "error while attempting connection close\n#{exception_to_str(e)}"
ensure
@connection = nil
end
@@ -261,57 +262,77 @@
end
def begin_transaction(options={})
raise MessageDriver::TransactionError, "you can't begin another transaction, you are already in one!" if in_transaction?
@in_transaction = true
+ @in_confirms_transaction = true if options[:type] == :confirm_and_wait
end
def commit_transaction(channel_commit=false)
raise MessageDriver::TransactionError, "you can't finish the transaction unless you already in one!" if !in_transaction? && !channel_commit
begin
- if is_transactional? && valid? && !@need_channel_reset
- handle_errors do
- if @rollback_only
- @channel.tx_rollback
- else
- @channel.tx_commit
+ if @in_confirms_transaction
+ wait_for_confirms(@channel) unless @rollback_only
+ else
+ if is_transactional? && valid? && !@need_channel_reset
+ handle_errors do
+ if @rollback_only
+ @channel.tx_rollback
+ else
+ @channel.tx_commit
+ end
end
end
end
ensure
@rollback_only = false
@in_transaction = false
+ @in_confirms_transaction = false
end
end
+ def wait_for_confirms(channel)
+ until channel.unconfirmed_set.empty?
+ channel.wait_for_confirms
+ end
+ end
+ private :wait_for_confirms
+
def rollback_transaction
@rollback_only = true
commit_transaction
end
- def is_transactional?
+ def transactional?
@is_transactional
end
+ alias_method :is_transactional?, :transactional?
def in_transaction?
@in_transaction
end
def publish(destination, body, headers={}, properties={})
exchange, routing_key, props = *destination.publish_params(headers, properties)
+ confirm = props.delete(:confirm)
+ confirm = false if confirm.nil?
with_channel(true) do |ch|
+ if confirm == true
+ ch.confirm_select unless ch.using_publisher_confirms?
+ end
ch.basic_publish(body, exchange, routing_key, props)
+ ch.wait_for_confirms if confirm == true
end
end
def pop_message(destination, options={})
raise MessageDriver::Error, "You can't pop a message off an exchange" if destination.is_a? ExchangeDestination
with_channel(false) do |ch|
queue = ch.queue(destination.name, passive: true)
- message = queue.pop(ack: !!options[:client_ack])
+ message = queue.pop(ack: options.fetch(:client_ack, false))
if message.nil? || message[0].nil?
nil
else
args_to_message(*message)
end
@@ -320,18 +341,18 @@
def supports_client_acks?
true
end
- def ack_message(message, options={})
+ def ack_message(message, _options={})
with_channel(true) do |ch|
ch.ack(message.delivery_tag)
end
end
def nack_message(message, options={})
- requeue = options[:requeue].kind_of?(FalseClass) ? false : true
+ requeue = options[:requeue].is_a?(FalseClass) ? false : true
with_channel(true) do |ch|
ch.reject(message.delivery_tag, requeue)
end
end
@@ -369,11 +390,11 @@
begin
yield
rescue Bunny::ChannelLevelException => e
@need_channel_reset = true
@rollback_only = true if in_transaction?
- if e.kind_of? Bunny::NotFound
+ if e.is_a? Bunny::NotFound
raise MessageDriver::QueueNotFound.new(e.to_s, e)
else
raise MessageDriver::WrappedError.new(e.to_s, e)
end
rescue Bunny::ChannelAlreadyClosed => e
@@ -387,16 +408,22 @@
end
end
def with_channel(require_commit=true)
raise MessageDriver::TransactionRollbackOnly if @rollback_only
- raise MessageDriver::Error, "this adapter context is not valid!" if !valid?
+ raise MessageDriver::Error, 'this adapter context is not valid!' unless valid?
@channel = adapter.connection.create_channel if @channel.nil?
reset_channel if @need_channel_reset
- if in_transaction? && !is_transactional?
- @channel.tx_select
- @is_transactional = true
+ if in_transaction?
+ if @in_confirms_transaction
+ @channel.confirm_select unless @channel.using_publisher_confirmations?
+ else
+ unless is_transactional?
+ @channel.tx_select
+ @is_transactional = true
+ end
+ end
end
handle_errors do
result = yield @channel
commit_transaction(true) if require_commit && is_transactional? && !in_transaction?
result
@@ -428,13 +455,13 @@
logger.error exception_to_str(e)
end
end
def validate_bunny_version
- required = Gem::Requirement.create('>= 1.1.3')
+ required = Gem::Requirement.create('>= 1.2.2')
current = Gem::Version.create(Bunny::VERSION)
unless required.satisfied_by? current
- raise MessageDriver::Error, "bunny 1.1.3 or later is required for the bunny adapter"
+ raise MessageDriver::Error, 'bunny 1.2.2 or later is required for the bunny adapter'
end
end
end
end
end