Sha256: 24393f1ad0bc378c72b87db944ffc7e404baf4d81d20f21e13b53f2a5268a595
Contents?: true
Size: 1.66 KB
Versions: 4
Compression:
Stored size: 1.66 KB
Contents
module Appfuel module Service class RpcClient < Sneakers::Publisher attr_accessor :call_id, :response attr_reader :channel, :config, :exchange, :reply_queue, :lock, :condition def publish(to_queue, action_route, msg, headers = {}) @mutex.synchronize do ensure_connection! unless connected? end self.call_id = SecureRandom.uuid msg = msg.to_json @response = nil params = { routing_key: to_queue, correlation_id: call_id, reply_to: reply_queue.name, content_type: 'application/json', headers: {action_route: action_route}.merge(headers) } exchange.publish(msg, params) lock.synchronize { condition.wait(lock) } result = JSON.parse(@response) Appfuel::ResponseHandler.new.create_response(result) end private def ensure_connection! super @reply_queue = channel.queue('', exclusive: true) subscribe end def subscribe @lock = Mutex.new @condition = ConditionVariable.new that = self reply_queue.bind(@opts[:exchange], routing_key: reply_queue.name) reply_queue.subscribe do |_delivery_info, properties, payload| if properties[:correlation_id] == that.call_id that.response = payload.to_s that.lock.synchronize { that.condition.signal } else Sneakers.logger.warn "request not found for correlation_id: " + "(#{properties[:correlation_id]}" end end end end end end
Version data entries
4 entries across 4 versions & 1 rubygems