lib/message_driver/client.rb in message-driver-0.5.3 vs lib/message_driver/client.rb in message-driver-0.6.0
- old
+ new
@@ -1,59 +1,98 @@
require 'forwardable'
module MessageDriver
+ # The client module is the primary client API for MessageDriver. It can either be
+ # included in a class that is using it, or used directly.
+ #
+ # @example Included as a Module
+ # class MyClass
+ # include MessageDriver::Client
+ #
+ # def do_work
+ # publish(:my_destination, 'Hi Mom!')
+ # end
+ # end
+ #
+ # @example Used Directly
+ # class DirectClass
+ # def use_directly
+ # MesageDriver::Client.find_destination(:my_queue)
+ # end
+ # end
module Client
include Logging
extend self
- def publish(destination, body, headers = {}, properties = {})
- find_destination(destination).publish(body, headers, properties)
+ # @!group Defining and Looking up Destinations
+
+ def dynamic_destination(dest_name, dest_options = {}, message_props = {})
+ current_adapter_context.create_destination(dest_name, dest_options, message_props)
end
- def pop_message(destination, options = {})
- find_destination(destination).pop_message(options)
+ # (see MessageDriver::Broker#find_destination)
+ # @note if +destination_name+ is a {Destination::Base}, +find_destination+ will just
+ # return that destination back
+ def find_destination(destination_name)
+ case destination_name
+ when Destination::Base
+ destination_name
+ else
+ broker.find_destination(destination_name)
+ end
end
- def subscribe(destination_name, consumer_name, options = {})
- consumer = find_consumer(consumer_name)
- subscribe_with(destination_name, options, &consumer)
+ # @!endgroup
+
+ # @!group Defining and Looking Up Consumers
+
+ def consumer(key, &block)
+ broker.consumer(key, &block)
end
- def subscribe_with(destination_name, options = {}, &consumer)
- destination = find_destination(destination_name)
- current_adapter_context.subscribe(destination, options, &consumer)
+ def find_consumer(consumer)
+ broker.find_consumer(consumer)
end
- def dynamic_destination(dest_name, dest_options = {}, message_props = {})
- current_adapter_context.create_destination(dest_name, dest_options, message_props)
+ # @!endgroup
+
+ # @!group Sending Messages
+
+ def publish(destination, body, headers = {}, properties = {})
+ find_destination(destination).publish(body, headers, properties)
end
+ # @!endgroup
+
+ # @!group Receiving Messages
+
+ def pop_message(destination, options = {})
+ find_destination(destination).pop_message(options)
+ end
+
def ack_message(message, options = {})
message.ack(options)
end
def nack_message(message, options = {})
message.nack(options)
end
- def consumer(key, &block)
- broker.consumer(key, &block)
+ def subscribe(destination_name, consumer_name, options = {})
+ consumer = find_consumer(consumer_name)
+ subscribe_with(destination_name, options, &consumer)
end
- def find_destination(destination)
- case destination
- when Destination::Base
- destination
- else
- broker.find_destination(destination)
- end
+ def subscribe_with(destination_name, options = {}, &consumer)
+ destination = find_destination(destination_name)
+ current_adapter_context.subscribe(destination, options, &consumer)
end
- def find_consumer(consumer)
- broker.find_consumer(consumer)
- end
+ # @!endgroup
+ # @!group Transaction Management
+
def with_message_transaction(options = {})
wrapper = fetch_context_wrapper
wrapper.increment_transaction_depth
begin
if wrapper.ctx.supports_transactions?
@@ -80,41 +119,49 @@
ensure
wrapper.decrement_transaction_depth
end
end
+ # @!endgroup
+
+ # @private
def current_adapter_context(initialize = true)
ctx = fetch_context_wrapper(initialize)
ctx.nil? ? nil : ctx.ctx
end
+ # @private
def with_adapter_context(adapter_context)
old_ctx = fetch_context_wrapper(false)
Thread.current[adapter_context_key] = build_context_wrapper(adapter_context)
begin
yield
ensure
set_context_wrapper(old_ctx)
end
end
+ # @private
def clear_context
wrapper = fetch_context_wrapper(false)
unless wrapper.nil?
wrapper.invalidate
set_context_wrapper(nil)
end
end
+ # @return [Broker] the broker associated with this Client module
def broker
Broker.broker(broker_name)
end
+ # @return [Symbol] the name of the broker associated with this Client module
def broker_name
Broker::DEFAULT_BROKER_NAME
end
+ # @private
def for_broker(name)
Module.new do
include Client
extend self
@@ -123,10 +170,15 @@
end
end
end
module_function :for_broker
+ # @return [Client] the client for the specified broker
+ # @example
+ # class MyClass
+ # include MessageDriver::Client[:my_broker]
+ # end
def [](index)
Broker.client(index)
end
private
@@ -158,9 +210,10 @@
def adapter_context_key
@__adapter_context_key ||= "#{broker_name}_adapter_context".to_sym
end
+ # @private
class ContextWrapper
extend Forwardable
def_delegators :@ctx, :valid?, :invalidate