require 'puppet/util/queue'
require 'stomp'
require 'uri'
# Implements the Ruby Stomp client as a queue type within the
# Puppet::Indirector::Queue::Client registry, for use with the :queue
# indirection terminus type.
#
# Looks to Puppet[:queue_source] for the sole argument to the
# underlying Stomp::Client constructor; consequently, for this client to work,
# Puppet[:queue_source] must use the Stomp::Client URL-like syntax
# for identifying the Stomp message broker: login:pass@host.port
class Puppet::Util::Queue::Stomp
attr_accessor :stomp_client
def initialize
begin
uri = URI.parse(Puppet[:queue_source])
rescue => detail
raise ArgumentError, "Could not create Stomp client instance - queue source #{Puppet[:queue_source]} is invalid: #{detail}"
end
unless uri.scheme == "stomp"
raise ArgumentError, "Could not create Stomp client instance - queue source #{Puppet[:queue_source]} is not a Stomp URL: #{detail}"
end
begin
self.stomp_client = Stomp::Client.new(uri.user, uri.password, uri.host, uri.port, true)
rescue => detail
raise ArgumentError, "Could not create Stomp client instance with queue source #{Puppet[:queue_source]}: got internal Stomp client error #{detail}"
end
# Identify the supported method for sending messages.
@method =
case
when stomp_client.respond_to?(:publish)
:publish
when stomp_client.respond_to?(:send)
:send
else
raise ArgumentError, "STOMP client does not respond to either publish or send"
end
end
def publish_message(target, msg)
stomp_client.__send__(@method, stompify_target(target), msg, :persistent => true)
end
def subscribe(target)
stomp_client.subscribe(stompify_target(target), :ack => :client) do |stomp_message|
yield(stomp_message.body)
stomp_client.acknowledge(stomp_message)
end
end
def stompify_target(target)
'/queue/' + target.to_s
end
Puppet::Util::Queue.register_queue_type(self, :stomp)
end