Sha256: 53ed22b8502bca8c8e2ab1692c68063623444312d5a4db7f67d75f7f896a3536

Contents?: true

Size: 1.97 KB

Versions: 1

Compression:

Stored size: 1.97 KB

Contents

require 'bunny'
require 'json'
require 'securerandom'

module EasyBunnyRPC
  class Client
    def initialize options={}
      @options = options
      @subscribed = false

      # The timed queue which will collect incoming messages
      @timed_queue = EasyBunnyRPC::TimedQueue.new

      # Need to set a default timeout. How about 5 seconds?
      set_timeout(5)
    end

    def pop
      correlation_id, payload = @timed_queue.pop_with_timeout(@timeout)

      JSON.parse(payload).merge!({ 'correlation_id' => correlation_id })
    end

    def set_timeout(value)
      @timeout = value
    end

    def publish(payload, correlation_id=default_correlation_id)
      start_subscription unless @subscribed
      exchange.publish([payload].to_json, routing_key: @options[:queue], correlation_id: correlation_id, reply_to: queue.name, expiration: (@timeout*1000).to_i)
    end

    def close
      if defined?(@channel)
        channel.close
        remove_instance_variable :@channel
        remove_instance_variable :@queue
      end

      if defined?(@connection)
        connection.close
        remove_instance_variable :@connection
      end
    end

    private

    def start_subscription
      queue.subscribe(block: false) do |delivery_info, properties, payload|
        @timed_queue.push([properties.correlation_id, payload])
      end

      @subscribed = true
    end

    def connection
      return @connection if defined?(@connection)

      @connection = Bunny.new(@options[:bunny])
      @connection.start
      @connection
    end

    def channel
      @channel ||= connection.create_channel
    end

    # The exclusive no-name queue that is created for communicating back to me
    def queue
      @queue ||= channel.queue(generate_queue_name, exclusive: true)
    end

    def generate_queue_name
      [@options[:queue], '-client_', SecureRandom.hex].join
    end

    def exchange
      @exchange ||= channel.default_exchange
    end

    def default_correlation_id
      ''
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
easy_bunny_rpc-0.1.1 lib/easy_bunny_rpc/client.rb