lib/message_driver/client.rb in message-driver-0.4.0 vs lib/message_driver/client.rb in message-driver-0.5.0

- old
+ new

@@ -3,39 +3,37 @@ module MessageDriver module Client include Logging extend self - def publish(destination, body, headers={}, properties={}) - dest = find_destination(destination) - current_adapter_context.publish(dest, body, headers, properties) + def publish(destination, body, headers = {}, properties = {}) + find_destination(destination).publish(body, headers, properties) end - def pop_message(destination, options={}) - dest = find_destination(destination) - current_adapter_context.pop_message(dest, options) + def pop_message(destination, options = {}) + find_destination(destination).pop_message(options) end - def subscribe(destination_name, consumer_name, options={}) + def subscribe(destination_name, consumer_name, options = {}) consumer = find_consumer(consumer_name) subscribe_with(destination_name, options, &consumer) end - def subscribe_with(destination_name, options={}, &consumer) + def subscribe_with(destination_name, options = {}, &consumer) destination = find_destination(destination_name) current_adapter_context.subscribe(destination, options, &consumer) end - def dynamic_destination(dest_name, dest_options={}, message_props={}) + def dynamic_destination(dest_name, dest_options = {}, message_props = {}) current_adapter_context.create_destination(dest_name, dest_options, message_props) end - def ack_message(message, options={}) + def ack_message(message, options = {}) message.ack(options) end - def nack_message(message, options={}) + def nack_message(message, options = {}) message.nack(options) end def consumer(key, &block) broker.consumer(key, &block) @@ -52,11 +50,11 @@ def find_consumer(consumer) broker.find_consumer(consumer) end - def with_message_transaction(options={}) + def with_message_transaction(options = {}) wrapper = fetch_context_wrapper wrapper.increment_transaction_depth begin if wrapper.ctx.supports_transactions? if wrapper.transaction_depth == 1 @@ -82,17 +80,18 @@ ensure wrapper.decrement_transaction_depth end end - def current_adapter_context(initialize=true) + def current_adapter_context(initialize = true) ctx = fetch_context_wrapper(initialize) ctx.nil? ? nil : ctx.ctx end def with_adapter_context(adapter_context) - old_ctx, Thread.current[adapter_context_key] = fetch_context_wrapper(false), build_context_wrapper(adapter_context) + old_ctx = fetch_context_wrapper(false) + Thread.current[adapter_context_key] = build_context_wrapper(adapter_context) begin yield ensure set_context_wrapper(old_ctx) end @@ -130,11 +129,11 @@ Broker.client(index) end private - def fetch_context_wrapper(initialize=true) + def fetch_context_wrapper(initialize = true) wrapper = Thread.current[adapter_context_key] if wrapper.nil? || !wrapper.valid? if initialize wrapper = build_context_wrapper else @@ -147,10 +146,10 @@ def set_context_wrapper(wrapper) Thread.current[adapter_context_key] = wrapper end - def build_context_wrapper(ctx=adapter.new_context) + def build_context_wrapper(ctx = adapter.new_context) ContextWrapper.new(ctx) end def adapter broker.adapter