Sha256: 8dcc9664ce1ab981ea5e12aac786e94fb828bdf806c8a0ef3266372c27d6f1a2

Contents?: true

Size: 1.82 KB

Versions: 6

Compression:

Stored size: 1.82 KB

Contents

require 'puppet/util/queue'
require 'stomp'
require 'uri'

# Implements the Ruby Stomp client as a queue type within the Puppet::Indirector::Queue::Client
# registry, for use with the <tt>:queue</tt> indirection terminus type.
#
# Looks to <tt>Puppet[:queue_source]</tt> for the sole argument to the underlying Stomp::Client constructor;
# consequently, for this client to work, <tt>Puppet[:queue_source]</tt> must use the Stomp::Client URL-like
# syntax for identifying the Stomp message broker: <em>login:pass@host.port</em>
class Puppet::Util::Queue::Stomp
    attr_accessor :stomp_client

    def initialize
        begin
            uri = URI.parse(Puppet[:queue_source])
        rescue => detail
            raise ArgumentError, "Could not create Stomp client instance - queue source %s is invalid: %s" % [Puppet[:queue_source], detail]
        end
        unless uri.scheme == "stomp"
            raise ArgumentError, "Could not create Stomp client instance - queue source %s is not a Stomp URL: %s" % [Puppet[:queue_source], detail]
        end

        begin
            self.stomp_client = Stomp::Client.new(uri.user, uri.password, uri.host, uri.port, true)
        rescue => detail
            raise ArgumentError, "Could not create Stomp client instance with queue source %s: got internal Stomp client error %s" % [Puppet[:queue_source], detail]
        end
    end

    def send_message(target, msg)
        stomp_client.send(stompify_target(target), msg, :persistent => true)
    end

    def subscribe(target)
        stomp_client.subscribe(stompify_target(target), :ack => :client) do |stomp_message|
            yield(stomp_message.body)
            stomp_client.acknowledge(stomp_message)
        end
    end

    def stompify_target(target)
        '/queue/' + target.to_s
    end

    Puppet::Util::Queue.register_queue_type(self, :stomp)
end

Version data entries

6 entries across 6 versions & 1 rubygems

Version Path
puppet-0.25.5 lib/puppet/util/queue/stomp.rb
puppet-0.25.4 lib/puppet/util/queue/stomp.rb
puppet-0.25.3 lib/puppet/util/queue/stomp.rb
puppet-0.25.2 lib/puppet/util/queue/stomp.rb
puppet-0.25.1 lib/puppet/util/queue/stomp.rb
puppet-0.25.0 lib/puppet/util/queue/stomp.rb