Sha256: be4c142e52fbec4beaf7673a1a0337c3d3a848e2610f04ea5544e835b4078d66

Contents?: true

Size: 1.49 KB

Versions: 5

Compression:

Stored size: 1.49 KB

Contents

module CukeQ
  class Broker
    attr_reader :user, :pass, :host, :port, :vhost

    def initialize(uri, opts = {})
      uri = URI.parse(uri) if uri.kind_of? String

      @user    = uri.user     || raise(ArgumentError, "no user given")
      @pass    = uri.password || 'cukeq123'
      @host    = uri.host     || 'localhost'
      @port    = uri.port     || 5672
      @vhost   = uri.path     || '/cukeq'

      @timeout = Integer(opts[:timeout] || 20)
      @queues  = {}
    end

    def start
      log self.class, :start, "#{@host}:#{@port}#{@vhost}"

      AMQP.start(amqp_options) do
        create_queues
        yield if block_given?
      end
    end

    def publish(queue_name, json)
      queue_for(queue_name).publish(json)
    end

    def subscribe(queue_name, &blk)
      queue_for(queue_name).subscribe(&blk)
    end

    def unsubscribe(queue_name, &blk)
      queue_for(queue_name).unsubscribe(&blk)
    end

    def queue_for(name)
      @queues[name] || raise("unknown queue: #{name.inspect}")
    end

    private

    def create_queues
      @queues[:ping]    = MQ.new.queue("cukeq.ping")
      @queues[:pong]    = MQ.new.queue("cukeq.pong")

      @queues[:results] = MQ.new.queue("cukeq.results")
      @queues[:jobs]    = MQ.new.queue("cukeq.jobs")
    end

    def amqp_options
      {
        :host    => @host,
        :port    => @port,
        :user    => @user,
        :pass    => @pass,
        :vhost   => @vhost,
        :timeout => 20
      }
    end

  end # QueueHandler
end # CukeQ

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
cukeq-0.0.1.dev5 lib/cukeq/broker.rb
cukeq-0.0.1.dev4 lib/cukeq/broker.rb
cukeq-0.0.1.dev3 lib/cukeq/broker.rb
cukeq-0.0.1.dev2 lib/cukeq/broker.rb
cukeq-0.0.1.dev lib/cukeq/broker.rb