Sha256: 24393f1ad0bc378c72b87db944ffc7e404baf4d81d20f21e13b53f2a5268a595

Contents?: true

Size: 1.66 KB

Versions: 4

Compression:

Stored size: 1.66 KB

Contents

module Appfuel
  module Service
    class RpcClient < Sneakers::Publisher
      attr_accessor :call_id, :response
      attr_reader :channel, :config, :exchange, :reply_queue, :lock, :condition

      def publish(to_queue, action_route, msg, headers = {})
        @mutex.synchronize do
          ensure_connection! unless connected?
        end

        self.call_id = SecureRandom.uuid
        msg          = msg.to_json
        @response    = nil
        params       = {
          routing_key:    to_queue,
          correlation_id: call_id,
          reply_to:       reply_queue.name,
          content_type:   'application/json',
          headers:        {action_route: action_route}.merge(headers)
        }

        exchange.publish(msg, params)
        lock.synchronize { condition.wait(lock) }

        result = JSON.parse(@response)
        Appfuel::ResponseHandler.new.create_response(result)
      end

      private

      def ensure_connection!
        super
        @reply_queue = channel.queue('', exclusive: true)
        subscribe
      end

      def subscribe
        @lock      = Mutex.new
        @condition = ConditionVariable.new
        that       = self

        reply_queue.bind(@opts[:exchange], routing_key: reply_queue.name)

        reply_queue.subscribe do |_delivery_info, properties, payload|
          if properties[:correlation_id] == that.call_id
            that.response = payload.to_s
            that.lock.synchronize { that.condition.signal }
          else
            Sneakers.logger.warn "request not found for correlation_id: " +
                                 "(#{properties[:correlation_id]}"
          end
        end
      end
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
appfuel-service-0.2.5 lib/appfuel/service/rpc_client.rb
appfuel-service-0.2.3 lib/appfuel/service/rpc_client.rb
appfuel-service-0.2.2 lib/appfuel/service/rpc_client.rb
appfuel-service-0.2.1 lib/appfuel/service/rpc_client.rb