Sha256: f0beadc23ea32cf92101eaca0f61da94b980e2be96e0a1cd4ae687bf81694016

Contents?: true

Size: 1.58 KB

Versions: 1

Compression:

Stored size: 1.58 KB

Contents

require "oj"
require "securerandom"

class Pigato::Client

  def initialize broker
    @broker = broker
    @context = ZMQ::Context.new(1)
    @socket = nil
    @poller = ZMQ::Poller.new
    @timeout = 2500

    start
  end

  def request service, request, timeout = @timeout
    request = [Oj.dump(request)]

    rid = SecureRandom.uuid
    request = [Pigato::C_CLIENT, Pigato::W_REQUEST, service, rid].concat(request)
    @socket.send_strings request

    res = [] 
    while 1 do
      chunk = _recv(rid, timeout)
      break if chunk == nil
      res << Oj.load(chunk[4])
      break if chunk[0] == Pigato::W_REPLY
    end

    return res[0] if res.length === 1
    res
  end

  def _recv rid, timeout = @timeout
    items = @poller.poll(timeout)
    if items 
      msg = []
      d1 = Time.now
      while 1 do
        @socket.recv_strings(msg, ZMQ::DONTWAIT)
        msg = [] if msg.length < 5 || msg[3] != rid
        break if msg.length > 0 || ((Time.now - d1) * 1000 > timeout)
        sleep(1.0 / 50.0)
      end

      return nil if msg.length == 0

      # header
      if msg.shift != Pigato::C_CLIENT
        raise RuntimeError, "Not a valid Pigato message"
      end

      return msg 
    end
    nil
  end
  
  def start
    reconnect_to_broker
  end

  def stop
    $socket.close
  end

  def reconnect_to_broker
    if @socket
      @poller.deregister @socket, ZMQ::DEALER
    end

    @socket = @context.socket ZMQ::DEALER
    @socket.setsockopt ZMQ::LINGER, 0
    @socket.setsockopt ZMQ::IDENTITY, SecureRandom.uuid
    @socket.connect @broker
    @poller.register @socket, ZMQ::POLLIN
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
pigato-0.2.0 lib/pigato/client.rb