lib/message_driver/adapters/stomp_adapter.rb in message-driver-0.2.2 vs lib/message_driver/adapters/stomp_adapter.rb in message-driver-0.3.0

- old
+ new

@@ -11,25 +11,26 @@ module Adapters class StompAdapter < Base class Message < MessageDriver::Message::Base attr_reader :stomp_message - def initialize(stomp_message) + def initialize(ctx, stomp_message) @stomp_message = stomp_message - super(stomp_message.body, stomp_message.headers, {}) + super(ctx, stomp_message.body, stomp_message.headers, {}) end end class Destination < MessageDriver::Destination::Base end attr_reader :config, :poll_timeout - def initialize(config) + def initialize(broker, config) validate_stomp_version + @broker = broker @config = config.symbolize_keys connect_headers = @config[:connect_headers] ||= {} connect_headers.symbolize_keys connect_headers[:"accept-version"] = "1.1,1.2" @@ -50,11 +51,11 @@ def create_destination(name, dest_options={}, message_props={}) unless name.start_with?("/") name = "/queue/#{name}" end - Destination.new(self, name, dest_options, message_props) + Destination.new(adapter, name, dest_options, message_props) end def publish(destination, body, headers={}, properties={}) with_connection do |connection| connection.publish(destination.name, body, headers) @@ -64,20 +65,19 @@ def pop_message(destination, options={}) with_connection do |connection| sub_id = connection.uuid msg = nil count = 0 - options[:id] = sub_id #this is a workaround for https://github.com/stompgem/stomp/issues/56 connection.subscribe(destination.name, options, sub_id) while msg.nil? && count < max_poll_count msg = connection.poll if msg.nil? count += 1 sleep 0.1 end end connection.unsubscribe(destination.name, options, sub_id) - Message.new(msg) if msg + Message.new(self, msg) if msg end end private