lib/message_driver/client.rb in message-driver-0.2.2 vs lib/message_driver/client.rb in message-driver-0.3.0
- old
+ new
@@ -28,27 +28,34 @@
def dynamic_destination(dest_name, dest_options={}, message_props={})
current_adapter_context.create_destination(dest_name, dest_options, message_props)
end
def ack_message(message, options={})
- ctx = current_adapter_context
- if ctx.supports_client_acks?
- ctx.ack_message(message, options)
- else
- logger.debug("this adapter does not support client acks")
- end
+ message.ack(options)
end
def nack_message(message, options={})
- ctx = current_adapter_context
- if ctx.supports_client_acks?
- ctx.nack_message(message, options)
+ message.nack(options)
+ end
+
+ def consumer(key, &block)
+ broker.consumer(key, &block)
+ end
+
+ def find_destination(destination)
+ case destination
+ when Destination::Base
+ destination
else
- logger.debug("this adapter does not support client acks")
+ broker.find_destination(destination)
end
end
+ def find_consumer(consumer)
+ broker.find_consumer(consumer)
+ end
+
def with_message_transaction(options={}, &block)
wrapper = fetch_context_wrapper
wrapper.increment_transaction_depth
begin
if wrapper.ctx.supports_transactions?
@@ -81,11 +88,11 @@
ctx = fetch_context_wrapper(initialize)
ctx.nil? ? nil : ctx.ctx
end
def with_adapter_context(adapter_context, &block)
- old_ctx, Thread.current[:adapter_context] = fetch_context_wrapper(false), build_context_wrapper(adapter_context)
+ old_ctx, Thread.current[adapter_context_key] = fetch_context_wrapper(false), build_context_wrapper(adapter_context)
begin
yield
ensure
set_context_wrapper(old_ctx)
end
@@ -97,50 +104,65 @@
wrapper.invalidate
set_context_wrapper(nil)
end
end
+ def broker
+ Broker.broker(broker_name)
+ end
+
+ def broker_name
+ Broker::DEFAULT_BROKER_NAME
+ end
+
+ def for_broker(_broker_name)
+ Module.new do |mod|
+ include Client
+ extend self
+
+ define_method :broker_name do
+ _broker_name
+ end
+ end
+ end
+ module_function :for_broker
+
+ def [](index)
+ Broker.client(index)
+ end
+
private
def fetch_context_wrapper(initialize=true)
- wrapper = Thread.current[:adapter_context]
+ wrapper = Thread.current[adapter_context_key]
if wrapper.nil? || !wrapper.valid?
if initialize
wrapper = build_context_wrapper
else
wrapper = nil
end
- Thread.current[:adapter_context] = wrapper
+ Thread.current[adapter_context_key] = wrapper
end
wrapper
end
def set_context_wrapper(wrapper)
- Thread.current[:adapter_context] = wrapper
+ Thread.current[adapter_context_key] = wrapper
end
- def build_context_wrapper(ctx=Broker.adapter.new_context)
+ def build_context_wrapper(ctx=adapter.new_context)
ContextWrapper.new(ctx)
end
- def find_destination(destination)
- case destination
- when Destination::Base
- destination
- else
- Broker.find_destination(destination)
- end
+ def adapter
+ broker.adapter
end
- def find_consumer(consumer)
- Broker.find_consumer(consumer)
+ def adapter_context_key
+ @__adapter_context_key ||= "#{broker_name}_adapter_context".to_sym
end
- def adapter
- Broker.adapter
- end
-
class ContextWrapper
extend Forwardable
def_delegators :@ctx, :valid?, :invalidate
@@ -159,7 +181,6 @@
def decrement_transaction_depth
@transaction_depth -= 1
end
end
end
-
end