Sha256: abf874a85f04ba400d52b42e29ce174d3a7a1e24b7b07b5dc713555f43435529

Contents?: true

Size: 1.7 KB

Versions: 1

Compression:

Stored size: 1.7 KB

Contents

require 'bunny'
require 'json'
require 'securerandom'

module EasyBunnyRPC
  class Client
    def initialize options={}
      @options = options
      @subscribed = false

      # The timed queue which will collect incoming messages
      @timed_queue = EasyBunnyRPC::TimedQueue.new

      # Need to set a default timeout. How about 5 seconds?
      set_timeout(5)
    end

    private

    def pop
      correlation_id, payload = @timed_queue.pop_with_timeout(@timeout)

      JSON.parse(payload).merge!({ 'correlation_id' => correlation_id })
    end

    def set_timeout(value)
      @timeout = value
    end

    def start_subscription
      queue.subscribe(block: false) do |delivery_info, properties, payload|
        @timed_queue.push([properties.correlation_id, payload])
      end

      @subscribed = true
    end

    def publish(payload, correlation_id=default_correlation_id)
      start_subscription unless @subscribed
      exchange.publish([payload].to_json, routing_key: @options[:queue], correlation_id: correlation_id, reply_to: queue.name, expiration: (@timeout*1000).to_i)
    end

    def connection
      return @connection if defined?(@connection)

      @connection = Bunny.new(@options[:bunny])
      @connection.start
      @connection
    end

    def channel
      @channel ||= connection.create_channel
    end

    # The exclusive no-name queue that is created for communicating back to me
    def queue
      @queue ||= channel.queue(generate_queue_name, exclusive: true)
    end

    def generate_queue_name
      [@options[:queue], '-client_', SecureRandom.hex].join
    end

    def exchange
      @exchange ||= channel.default_exchange
    end

    def default_correlation_id
      ''
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
easy_bunny_rpc-0.1.0.alpha lib/easy_bunny_rpc/client.rb