Sha256: ca1a8e97efebf8f1842b1e06009360e0d666dbf6766c2316ff58149d6adc83b8

Contents?: true

Size: 1.47 KB

Versions: 1

Compression:

Stored size: 1.47 KB

Contents

require "oj"
require "securerandom"

class PigatoClient

  def initialize broker
    @broker = broker
    @context = ZMQ::Context.new(1)
    @client = nil
    @poller = ZMQ::Poller.new
    @timeout = 2500

    reconnect_to_broker
  end

  def send service, request, timeout = @timeout
    request = [Oj.dump(request)]

    rid = SecureRandom.uuid
    request = [Pigato::C_CLIENT, Pigato::W_REQUEST, service, rid].concat(request)
    @client.send_strings request

    res = [] 
    while 1 do
      chunk = _recv(rid, timeout)
      break if chunk == nil
      res << Oj.load(chunk[4])
      break if chunk[0] == Pigato::W_REPLY
    end

    res
  end

  def _recv rid, timeout = @timeout
    items = @poller.poll(timeout)
    if items 
      msg = []
      d1 = Time.now
      while 1 do
        @client.recv_strings(msg, ZMQ::DONTWAIT)
        msg = [] if msg.length < 5 || msg[3] != rid
        break if msg.length > 0 || ((Time.now - d1) * 1000 > timeout)
        sleep(1.0 / 50.0)
      end

      return nil if msg.length == 0

      # header
      if msg.shift != Pigato::C_CLIENT
        raise RuntimeError, "Not a valid Pigato message"
      end

      return msg 
    end
    nil
  end

  def reconnect_to_broker
    if @client
      @poller.deregister @client, ZMQ::DEALER
    end

    @client = @context.socket ZMQ::DEALER
    @client.setsockopt ZMQ::LINGER, 0
    @client.setsockopt ZMQ::IDENTITY, SecureRandom.uuid
    @client.connect @broker
    @poller.register @client, ZMQ::POLLIN
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
pigato-0.1.7 lib/pigato/client.rb