lib/message_driver/adapters/bunny_adapter.rb in message-driver-0.2.2 vs lib/message_driver/adapters/bunny_adapter.rb in message-driver-0.3.0
- old
+ new
@@ -1,7 +1,6 @@
require 'bunny'
-require 'bunny/session_patch'
module MessageDriver
class Broker
def bunny_adapter
MessageDriver::Adapters::BunnyAdapter
@@ -13,12 +12,12 @@
NETWORK_ERRORS = [Bunny::TCPConnectionFailed, Bunny::ConnectionClosedError, Bunny::ConnectionLevelException, Bunny::NetworkErrorWrapper, Bunny::NetworkFailure, IOError].freeze
class Message < MessageDriver::Message::Base
attr_reader :delivery_info
- def initialize(delivery_info, properties, payload)
- super(payload, properties[:headers]||{}, properties)
+ def initialize(ctx, delivery_info, properties, payload)
+ super(ctx, payload, properties[:headers]||{}, properties)
@delivery_info = delivery_info
end
def delivery_tag
delivery_info.delivery_tag
@@ -78,17 +77,17 @@
def routing_key(properties)
@name
end
def message_count
- Client.current_adapter_context.with_channel(false) do |ch|
+ adapter.broker.client.current_adapter_context.with_channel(false) do |ch|
ch.queue(@name, @dest_options.merge(passive: true)).message_count
end
end
def purge
- Client.current_adapter_context.with_channel(false) do |ch|
+ adapter.broker.client.current_adapter_context.with_channel(false) do |ch|
bunny_queue(ch).purge
end
end
end
@@ -147,11 +146,11 @@
queue = destination.bunny_queue(@sub_ctx.channel)
if options.has_key? :prefetch_size
ch.prefetch(options[:prefetch_size])
end
@bunny_consumer = queue.subscribe(options.merge(manual_ack: true)) do |delivery_info, properties, payload|
- Client.with_adapter_context(@sub_ctx) do
+ adapter.broker.client.with_adapter_context(@sub_ctx) do
message = @sub_ctx.args_to_message(delivery_info, properties, payload)
handle_message(message)
end
end
end
@@ -164,11 +163,11 @@
consumer.call(message)
@sub_ctx.ack_message(message)
when :manual
consumer.call(message)
when :transactional
- Client.with_message_transaction do
+ adapter.broker.client.with_message_transaction do
consumer.call(message)
@sub_ctx.ack_message(message)
end
end
rescue => e
@@ -188,39 +187,39 @@
@error_handler.call(e, message) unless @error_handler.nil?
end
end
end
- def initialize(config)
+ def initialize(broker, config)
validate_bunny_version
+ @broker = broker
@config = config
- @handle_connection_errors = config.fetch(:handle_connection_errors, true)
- initialize_connection
end
def connection(ensure_started=true)
- initialize_connection
if ensure_started
begin
+ @connection ||= Bunny::Session.new(@config)
@connection.start
rescue *NETWORK_ERRORS => e
raise MessageDriver::ConnectionError.new(e.to_s, e)
+ rescue => e
+ stop
+ raise e
end
end
@connection
end
def stop
begin
super
- @connection.close if !@connection.nil? && @connection.open?
- rescue *NETWORK_ERRORS => e
+ @connection.close if !@connection.nil?
+ rescue => e
logger.error "error while attempting connection close\n#{exception_to_str(e)}"
ensure
- conn = @connection
@connection = nil
- conn.cleanup_threads unless conn.nil?
end
end
def build_context
BunnyContext.new(self)
@@ -261,16 +260,10 @@
true
end
def begin_transaction(options={})
raise MessageDriver::TransactionError, "you can't begin another transaction, you are already in one!" if in_transaction?
- unless is_transactional?
- with_channel(false) do |ch|
- ch.tx_select
- end
- @is_transactional = true
- end
@in_transaction = true
end
def commit_transaction(channel_commit=false)
raise MessageDriver::TransactionError, "you can't finish the transaction unless you already in one!" if !in_transaction? && !channel_commit
@@ -394,22 +387,26 @@
end
end
def with_channel(require_commit=true)
raise MessageDriver::TransactionRollbackOnly if @rollback_only
- raise MessageDriver::Error, "oh nos!" if !valid?
+ raise MessageDriver::Error, "this adapter context is not valid!" if !valid?
@channel = adapter.connection.create_channel if @channel.nil?
reset_channel if @need_channel_reset
+ if in_transaction? && !is_transactional?
+ @channel.tx_select
+ @is_transactional = true
+ end
handle_errors do
result = yield @channel
commit_transaction(true) if require_commit && is_transactional? && !in_transaction?
result
end
end
def args_to_message(delivery_info, properties, payload)
- Message.new(delivery_info, properties, payload)
+ Message.new(self, delivery_info, properties, payload)
end
private
def reset_channel
@@ -430,46 +427,14 @@
rescue => e
logger.error exception_to_str(e)
end
end
- def initialize_connection
- if @handle_connection_errors
- if @connection_thread.nil?
- #hi mom!
- @connection_thread = Thread.new do
- @connection = Bunny.new(@config)
- begin
- sleep
- rescue *NETWORK_ERRORS => e
- logger.error "error on connection\n#{exception_to_str(e)}"
- if @connection.automatically_recover?
- sleep 0.1
- unless @connection.recovering_from_network_failure?
- stop
- end
- else
- stop
- end
- retry
- rescue => e
- logger.error "unhandled error in connection thread! #{exception_to_str(e)}"
- end
- end
- @connection_thread.abort_on_exception = true
- sleep 0.1
- end
- sleep 0.1 while @connection_thread.status != 'sleep'
- else
- @connection ||= Bunny.new(@config)
- end
- end
-
def validate_bunny_version
- required = Gem::Requirement.create('>= 0.10.8')
+ required = Gem::Requirement.create('>= 1.1.3')
current = Gem::Version.create(Bunny::VERSION)
unless required.satisfied_by? current
- raise MessageDriver::Error, "bunny 0.10.8 or later is required for the bunny adapter"
+ raise MessageDriver::Error, "bunny 1.1.3 or later is required for the bunny adapter"
end
end
end
end
end