Sha256: 27730a70f2645547f40728339fc6bd20ace776f400068f8a6e5a7d36682f3116

Contents?: true

Size: 1.4 KB

Versions: 3

Compression:

Stored size: 1.4 KB

Contents

require 'bunny'

module BunnyRpc
  class Client
    attr_reader :payload, :correlation_id

    def initialize(options={})
      @options = options
      @mutex = Mutex.new
      @cv = ConditionVariable.new
    end

    def publish(body, queue_name)
      subscribe

      exchange.publish(body, routing_key: queue_name, reply_to: reply_queue.name, correlation_id: @correlation_id)

      add_lock

      @payload
    end

    def close
      @connect.stop
    end

    def subscribe
      reply_queue.subscribe do |_, properties, payload|
        if properties.correlation_id == @correlation_id
          @payload = payload

          del_lock
        else
          del_lock
          raise 'correlation_id error !'
        end
      end
    end

    def add_lock
      @mutex.synchronize { @cv.wait(@mutex) }
    end

    def del_lock
      @mutex.synchronize { @cv.signal }
    end

    def connect
      return @connect if defined?(@connect)
      @connect = Bunny.new(@options)
      @connect.start
      @connect
    end

    def channel
      @channel ||= connect.create_channel
    end

    def exchange
      @exchange ||= channel.default_exchange
    end

    def reply_queue
      @reply_queue ||= channel.queue(reply_queue_name, exclusive: true, auto_delete: true)
    end

    def reply_queue_name
      "reply_to-#{rand(10)}"
    end

    def correlation_id
      "#{rand(10)}#{rand(10)}#{rand(10)}"
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
bunny_rpc-0.1.3 lib/bunny_rpc/client.rb
bunny_rpc-0.1.2 lib/bunny_rpc/client.rb
bunny_rpc-0.1.1 lib/bunny_rpc/client.rb