lib/message_driver/adapters/stomp_adapter.rb in message-driver-0.2.2 vs lib/message_driver/adapters/stomp_adapter.rb in message-driver-0.3.0
- old
+ new
@@ -11,25 +11,26 @@
module Adapters
class StompAdapter < Base
class Message < MessageDriver::Message::Base
attr_reader :stomp_message
- def initialize(stomp_message)
+ def initialize(ctx, stomp_message)
@stomp_message = stomp_message
- super(stomp_message.body, stomp_message.headers, {})
+ super(ctx, stomp_message.body, stomp_message.headers, {})
end
end
class Destination < MessageDriver::Destination::Base
end
attr_reader :config, :poll_timeout
- def initialize(config)
+ def initialize(broker, config)
validate_stomp_version
+ @broker = broker
@config = config.symbolize_keys
connect_headers = @config[:connect_headers] ||= {}
connect_headers.symbolize_keys
connect_headers[:"accept-version"] = "1.1,1.2"
@@ -50,11 +51,11 @@
def create_destination(name, dest_options={}, message_props={})
unless name.start_with?("/")
name = "/queue/#{name}"
end
- Destination.new(self, name, dest_options, message_props)
+ Destination.new(adapter, name, dest_options, message_props)
end
def publish(destination, body, headers={}, properties={})
with_connection do |connection|
connection.publish(destination.name, body, headers)
@@ -64,20 +65,19 @@
def pop_message(destination, options={})
with_connection do |connection|
sub_id = connection.uuid
msg = nil
count = 0
- options[:id] = sub_id #this is a workaround for https://github.com/stompgem/stomp/issues/56
connection.subscribe(destination.name, options, sub_id)
while msg.nil? && count < max_poll_count
msg = connection.poll
if msg.nil?
count += 1
sleep 0.1
end
end
connection.unsubscribe(destination.name, options, sub_id)
- Message.new(msg) if msg
+ Message.new(self, msg) if msg
end
end
private