lib/message_driver/client.rb in message-driver-0.4.0 vs lib/message_driver/client.rb in message-driver-0.5.0
- old
+ new
@@ -3,39 +3,37 @@
module MessageDriver
module Client
include Logging
extend self
- def publish(destination, body, headers={}, properties={})
- dest = find_destination(destination)
- current_adapter_context.publish(dest, body, headers, properties)
+ def publish(destination, body, headers = {}, properties = {})
+ find_destination(destination).publish(body, headers, properties)
end
- def pop_message(destination, options={})
- dest = find_destination(destination)
- current_adapter_context.pop_message(dest, options)
+ def pop_message(destination, options = {})
+ find_destination(destination).pop_message(options)
end
- def subscribe(destination_name, consumer_name, options={})
+ def subscribe(destination_name, consumer_name, options = {})
consumer = find_consumer(consumer_name)
subscribe_with(destination_name, options, &consumer)
end
- def subscribe_with(destination_name, options={}, &consumer)
+ def subscribe_with(destination_name, options = {}, &consumer)
destination = find_destination(destination_name)
current_adapter_context.subscribe(destination, options, &consumer)
end
- def dynamic_destination(dest_name, dest_options={}, message_props={})
+ def dynamic_destination(dest_name, dest_options = {}, message_props = {})
current_adapter_context.create_destination(dest_name, dest_options, message_props)
end
- def ack_message(message, options={})
+ def ack_message(message, options = {})
message.ack(options)
end
- def nack_message(message, options={})
+ def nack_message(message, options = {})
message.nack(options)
end
def consumer(key, &block)
broker.consumer(key, &block)
@@ -52,11 +50,11 @@
def find_consumer(consumer)
broker.find_consumer(consumer)
end
- def with_message_transaction(options={})
+ def with_message_transaction(options = {})
wrapper = fetch_context_wrapper
wrapper.increment_transaction_depth
begin
if wrapper.ctx.supports_transactions?
if wrapper.transaction_depth == 1
@@ -82,17 +80,18 @@
ensure
wrapper.decrement_transaction_depth
end
end
- def current_adapter_context(initialize=true)
+ def current_adapter_context(initialize = true)
ctx = fetch_context_wrapper(initialize)
ctx.nil? ? nil : ctx.ctx
end
def with_adapter_context(adapter_context)
- old_ctx, Thread.current[adapter_context_key] = fetch_context_wrapper(false), build_context_wrapper(adapter_context)
+ old_ctx = fetch_context_wrapper(false)
+ Thread.current[adapter_context_key] = build_context_wrapper(adapter_context)
begin
yield
ensure
set_context_wrapper(old_ctx)
end
@@ -130,11 +129,11 @@
Broker.client(index)
end
private
- def fetch_context_wrapper(initialize=true)
+ def fetch_context_wrapper(initialize = true)
wrapper = Thread.current[adapter_context_key]
if wrapper.nil? || !wrapper.valid?
if initialize
wrapper = build_context_wrapper
else
@@ -147,10 +146,10 @@
def set_context_wrapper(wrapper)
Thread.current[adapter_context_key] = wrapper
end
- def build_context_wrapper(ctx=adapter.new_context)
+ def build_context_wrapper(ctx = adapter.new_context)
ContextWrapper.new(ctx)
end
def adapter
broker.adapter