Sha256: ed593af6105589336666fd485eb85a11ba325bfb6a858b278cb09ccb2d8d496f

Contents?: true

Size: 1.25 KB

Versions: 1

Compression:

Stored size: 1.25 KB

Contents

module SneakersPacker
  class RpcReplySubscriber

    def initialize(client, publisher)
      @client = client
      @publisher = publisher
      @queue_name = "rpc.#{SecureRandom.uuid}"

      initialize_reply_queue
    end

    def reply_queue_name
      @queue_name
    end

    private

    def initialize_reply_queue
      # ensure_connection
      @publisher.instance_eval do
        @mutex.synchronize { ensure_connection! unless connected? }
      end

      channel = @publisher.instance_variable_get :@channel
      exchange = @publisher.instance_variable_get :@exchange
      build_reply_queue(channel, exchange)
    end

    def build_reply_queue(channel, exchange)
      @reply_queue = channel.queue(@queue_name, exclusive: true)

      @reply_queue.bind(exchange, routing_key: @reply_queue.name)

      that = @client

      @reply_queue.subscribe(manual_ack: false) do |delivery_info, properties, payload|
        request = that.request_hash[properties[:correlation_id]]
        if request
          request.response = payload
          request.set_processed!
          that.client_lock.synchronize { request.condition.signal }
        else
          Sneakers.logger.warn "#{properties[:correlation_id]}'s request is not found"
        end
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
sneakers_packer-0.1.5 lib/sneakers_packer/rpc_reply_subscriber.rb