lib/message_driver/adapters/stomp_adapter.rb in message-driver-0.4.0 vs lib/message_driver/adapters/stomp_adapter.rb in message-driver-0.5.0

- old
+ new

@@ -17,10 +17,15 @@ super(ctx, stomp_message.body, stomp_message.headers, {}) end end class Destination < MessageDriver::Destination::Base + def queue_path + @queue_path ||= begin + name.start_with?('/') ? name : "/queue/#{name}" + end + end end attr_reader :config, :poll_timeout def initialize(broker, config) @@ -41,41 +46,38 @@ class StompContext < ContextBase extend Forwardable def_delegators :adapter, :with_connection, :poll_timeout - #def subscribe(destination, consumer) - #destination.subscribe(&consumer) - #end + # def subscribe(destination, consumer) + # destination.subscribe(&consumer) + # end - def create_destination(name, dest_options={}, message_props={}) - unless name.start_with?('/') - name = "/queue/#{name}" - end + def create_destination(name, dest_options = {}, message_props = {}) Destination.new(adapter, name, dest_options, message_props) end - def publish(destination, body, headers={}, _properties={}) + def publish(destination, body, headers = {}, _properties = {}) with_connection do |connection| - connection.publish(destination.name, body, headers) + connection.publish(destination.queue_path, body, headers) end end - def pop_message(destination, options={}) + def pop_message(destination, options = {}) with_connection do |connection| sub_id = connection.uuid msg = nil count = 0 - connection.subscribe(destination.name, options, sub_id) + connection.subscribe(destination.queue_path, options, sub_id) while msg.nil? && count < max_poll_count msg = connection.poll if msg.nil? count += 1 sleep 0.1 end end - connection.unsubscribe(destination.name, options, sub_id) + connection.unsubscribe(destination.queue_path, options, sub_id) Message.new(self, msg) if msg end end private @@ -88,37 +90,34 @@ def build_context StompContext.new(self) end def with_connection - begin - @connection ||= open_connection - yield @connection - rescue SystemCallError, IOError => e - raise MessageDriver::ConnectionError.new(e) - end + @connection ||= open_connection + yield @connection + rescue SystemCallError, IOError => e + raise MessageDriver::ConnectionError.new(e) end def stop super - if @connection - @connection.disconnect - end + @connection.disconnect if @connection end private def open_connection conn = Stomp::Connection.new(@config) - raise MessageDriver::ConnectionError, conn.connection_frame.to_s unless conn.open? + fail MessageDriver::ConnectionError, conn.connection_frame.to_s unless conn.open? conn end def validate_stomp_version required = Gem::Requirement.create('~> 1.3.1') current = Gem::Version.create(Stomp::Version::STRING) unless required.satisfied_by? current - raise MessageDriver::Error, 'stomp 1.3.1 or a later version of the 1.3.x series is required for the stomp adapter' + fail MessageDriver::Error, + 'stomp 1.3.1 or a later version of the 1.3.x series is required for the stomp adapter' end end end end end