lib/message_driver/adapters/stomp_adapter.rb in message-driver-0.6.1 vs lib/message_driver/adapters/stomp_adapter.rb in message-driver-0.7.0
- old
+ new
@@ -10,13 +10,13 @@
module Adapters
class StompAdapter < Base
class Message < MessageDriver::Message::Base
attr_reader :stomp_message
- def initialize(ctx, stomp_message)
+ def initialize(ctx, destination, stomp_message)
@stomp_message = stomp_message
- super(ctx, stomp_message.body, stomp_message.headers, {})
+ super(ctx, destination, stomp_message.body, stomp_message.headers, {})
end
end
class Destination < MessageDriver::Destination::Base
def queue_path
@@ -46,25 +46,25 @@
class StompContext < ContextBase
extend Forwardable
def_delegators :adapter, :with_connection, :poll_timeout
- # def subscribe(destination, consumer)
+ # def handle_subscribe(destination, consumer)
# destination.subscribe(&consumer)
# end
- def create_destination(name, dest_options = {}, message_props = {})
+ def handle_create_destination(name, dest_options = {}, message_props = {})
Destination.new(adapter, name, dest_options, message_props)
end
- def publish(destination, body, headers = {}, _properties = {})
+ def handle_publish(destination, body, headers = {}, _properties = {})
with_connection do |connection|
connection.publish(destination.queue_path, body, headers)
end
end
- def pop_message(destination, options = {})
+ def handle_pop_message(destination, options = {})
with_connection do |connection|
sub_id = connection.uuid
msg = nil
count = 0
connection.subscribe(destination.queue_path, options, sub_id)
@@ -74,11 +74,11 @@
count += 1
sleep 0.1
end
end
connection.unsubscribe(destination.queue_path, options, sub_id)
- Message.new(self, msg) if msg
+ Message.new(self, destination, msg) if msg
end
end
private
@@ -105,19 +105,19 @@
private
def open_connection
conn = Stomp::Connection.new(@config)
- fail MessageDriver::ConnectionError, conn.connection_frame.to_s unless conn.open?
+ raise MessageDriver::ConnectionError, conn.connection_frame.to_s unless conn.open?
conn
end
def validate_stomp_version
required = Gem::Requirement.create('~> 1.3.1')
current = Gem::Version.create(Stomp::Version::STRING)
unless required.satisfied_by? current
- fail MessageDriver::Error,
- 'stomp 1.3.1 or a later version of the 1.3.x series is required for the stomp adapter'
+ raise MessageDriver::Error,
+ 'stomp 1.3.1 or a later version of the 1.3.x series is required for the stomp adapter'
end
end
end
end
end