Sha256: 1b70b9c95268a9df25ee2e2d098c79ae1d960e0ae94e7c659139fea3094c4ceb

Contents?: true

Size: 1.87 KB

Versions: 2

Compression:

Stored size: 1.87 KB

Contents

require 'concurrent'
require 'oj'
require 'securerandom'
require 'uuidtools'

require 'fluffle/connectable'

module Fluffle
  class Client
    include Connectable

    def initialize(url:)
      self.connect url

      @uuid        = UUIDTools::UUID.timestamp_create.to_s
      @channel     = @connection.create_channel
      @exchange    = @channel.default_exchange
      @reply_queue = @channel.queue Fluffle.response_queue_name(@uuid), exclusive: true

      # Used for generating unique message IDs
      @prng = Random.new

      @pending_responses = Concurrent::Map.new

      self.subscribe
    end

    def subscribe
      @reply_queue.subscribe do |delivery_info, properties, payload|
        self.handle_resposne delivery_info: delivery_info,
                             properties: properties,
                             payload: payload
      end
    end

    def handle_resposne(delivery_info:, properties:, payload:)
      payload  = Oj.load payload

      ivar = @pending_responses.delete payload['id']
      ivar.set payload
    end

    def call(method, params = [], queue: 'default')
      id = random_bytes_as_hex 8

      payload = {
        'jsonrpc' => '2.0',
        'id'      => id,
        'method'  => method,
        'params'  => params
      }

      @exchange.publish Oj.dump(payload), routing_key: Fluffle.request_queue_name(queue),
                                          correlation_id: id,
                                          reply_to: @reply_queue.name

      ivar = Concurrent::IVar.new
      @pending_responses[id] = ivar

      response = ivar.value

      if response['result']
        response['result']
      else
        raise # TODO: Raise known error subclass to be caught by client code
      end
    end

    protected

    def random_bytes_as_hex(bytes)
      # Adapted from `SecureRandom.hex`
      @prng.bytes(bytes).unpack('H*')[0]
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
fluffle-0.0.2 lib/fluffle/client.rb
fluffle-0.0.1 lib/fluffle/client.rb