Sha256: 1b70b9c95268a9df25ee2e2d098c79ae1d960e0ae94e7c659139fea3094c4ceb
Contents?: true
Size: 1.87 KB
Versions: 2
Compression:
Stored size: 1.87 KB
Contents
require 'concurrent' require 'oj' require 'securerandom' require 'uuidtools' require 'fluffle/connectable' module Fluffle class Client include Connectable def initialize(url:) self.connect url @uuid = UUIDTools::UUID.timestamp_create.to_s @channel = @connection.create_channel @exchange = @channel.default_exchange @reply_queue = @channel.queue Fluffle.response_queue_name(@uuid), exclusive: true # Used for generating unique message IDs @prng = Random.new @pending_responses = Concurrent::Map.new self.subscribe end def subscribe @reply_queue.subscribe do |delivery_info, properties, payload| self.handle_resposne delivery_info: delivery_info, properties: properties, payload: payload end end def handle_resposne(delivery_info:, properties:, payload:) payload = Oj.load payload ivar = @pending_responses.delete payload['id'] ivar.set payload end def call(method, params = [], queue: 'default') id = random_bytes_as_hex 8 payload = { 'jsonrpc' => '2.0', 'id' => id, 'method' => method, 'params' => params } @exchange.publish Oj.dump(payload), routing_key: Fluffle.request_queue_name(queue), correlation_id: id, reply_to: @reply_queue.name ivar = Concurrent::IVar.new @pending_responses[id] = ivar response = ivar.value if response['result'] response['result'] else raise # TODO: Raise known error subclass to be caught by client code end end protected def random_bytes_as_hex(bytes) # Adapted from `SecureRandom.hex` @prng.bytes(bytes).unpack('H*')[0] end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
fluffle-0.0.2 | lib/fluffle/client.rb |
fluffle-0.0.1 | lib/fluffle/client.rb |