Sha256: dd1437eb6b07c96cb50b950fa0825177274b50c4738036a0cd4458f5c1ab281b

Contents?: true

Size: 1.74 KB

Versions: 1

Compression:

Stored size: 1.74 KB

Contents

require "oj"
require "securerandom"

class Pigato::Client

  def initialize broker, conf = {}
    @broker = broker
    @context = ZMQ::Context.new(1)
    @socket = nil
    @poller = ZMQ::Poller.new

    @conf = {
      :autostart => false,
      :timeout => 2500
    }

    @conf.merge!(conf)

    if @conf[:autostart]
      start
    end
  end

  def request service, request, timeout = @conf[:timeout]
    return nil if @socket == nil;

    request = [Oj.dump(request)]

    rid = SecureRandom.uuid
    request = [Pigato::C_CLIENT, Pigato::W_REQUEST, service, rid].concat(request)
    @socket.send_strings request

    res = [] 
    while 1 do
      chunk = _recv(rid, timeout)
      break if chunk == nil
      res << Oj.load(chunk[4])
      break if chunk[0] == Pigato::W_REPLY
    end

    return res[0] if res.length === 1
    res
  end

  def _recv rid, timeout = @timeout
    items = @poller.poll(timeout)
    if items 
      msg = []
      d1 = Time.now
      while 1 do
        @socket.recv_strings(msg, ZMQ::DONTWAIT)
        msg = [] if msg.length < 5 || msg[3] != rid
        break if msg.length > 0 || ((Time.now - d1) * 1000 > timeout)
        sleep(1.0 / 50.0)
      end

      return nil if msg.length == 0

      # header
      if msg.shift != Pigato::C_CLIENT
        raise RuntimeError, "Not a valid Pigato message"
      end

      return msg 
    end
    nil
  end
  
  def start
    reconnect_to_broker
  end

  def stop
    $socket.close
  end

  def reconnect_to_broker
    if @socket
      @poller.deregister @socket, ZMQ::DEALER
    end

    @socket = @context.socket ZMQ::DEALER
    @socket.setsockopt ZMQ::LINGER, 0
    @socket.setsockopt ZMQ::IDENTITY, SecureRandom.uuid
    @socket.connect @broker
    @poller.register @socket, ZMQ::POLLIN
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
pigato-0.2.1 lib/pigato/client.rb