lib/message_driver/client.rb in message-driver-0.2.2 vs lib/message_driver/client.rb in message-driver-0.3.0

- old
+ new

@@ -28,27 +28,34 @@ def dynamic_destination(dest_name, dest_options={}, message_props={}) current_adapter_context.create_destination(dest_name, dest_options, message_props) end def ack_message(message, options={}) - ctx = current_adapter_context - if ctx.supports_client_acks? - ctx.ack_message(message, options) - else - logger.debug("this adapter does not support client acks") - end + message.ack(options) end def nack_message(message, options={}) - ctx = current_adapter_context - if ctx.supports_client_acks? - ctx.nack_message(message, options) + message.nack(options) + end + + def consumer(key, &block) + broker.consumer(key, &block) + end + + def find_destination(destination) + case destination + when Destination::Base + destination else - logger.debug("this adapter does not support client acks") + broker.find_destination(destination) end end + def find_consumer(consumer) + broker.find_consumer(consumer) + end + def with_message_transaction(options={}, &block) wrapper = fetch_context_wrapper wrapper.increment_transaction_depth begin if wrapper.ctx.supports_transactions? @@ -81,11 +88,11 @@ ctx = fetch_context_wrapper(initialize) ctx.nil? ? nil : ctx.ctx end def with_adapter_context(adapter_context, &block) - old_ctx, Thread.current[:adapter_context] = fetch_context_wrapper(false), build_context_wrapper(adapter_context) + old_ctx, Thread.current[adapter_context_key] = fetch_context_wrapper(false), build_context_wrapper(adapter_context) begin yield ensure set_context_wrapper(old_ctx) end @@ -97,50 +104,65 @@ wrapper.invalidate set_context_wrapper(nil) end end + def broker + Broker.broker(broker_name) + end + + def broker_name + Broker::DEFAULT_BROKER_NAME + end + + def for_broker(_broker_name) + Module.new do |mod| + include Client + extend self + + define_method :broker_name do + _broker_name + end + end + end + module_function :for_broker + + def [](index) + Broker.client(index) + end + private def fetch_context_wrapper(initialize=true) - wrapper = Thread.current[:adapter_context] + wrapper = Thread.current[adapter_context_key] if wrapper.nil? || !wrapper.valid? if initialize wrapper = build_context_wrapper else wrapper = nil end - Thread.current[:adapter_context] = wrapper + Thread.current[adapter_context_key] = wrapper end wrapper end def set_context_wrapper(wrapper) - Thread.current[:adapter_context] = wrapper + Thread.current[adapter_context_key] = wrapper end - def build_context_wrapper(ctx=Broker.adapter.new_context) + def build_context_wrapper(ctx=adapter.new_context) ContextWrapper.new(ctx) end - def find_destination(destination) - case destination - when Destination::Base - destination - else - Broker.find_destination(destination) - end + def adapter + broker.adapter end - def find_consumer(consumer) - Broker.find_consumer(consumer) + def adapter_context_key + @__adapter_context_key ||= "#{broker_name}_adapter_context".to_sym end - def adapter - Broker.adapter - end - class ContextWrapper extend Forwardable def_delegators :@ctx, :valid?, :invalidate @@ -159,7 +181,6 @@ def decrement_transaction_depth @transaction_depth -= 1 end end end - end