Sha256: 565ab3cfa6fce45af089bff7462e14c5d872ac500a85273144ba1598a949339d

Contents?: true

Size: 1.24 KB

Versions: 2

Compression:

Stored size: 1.24 KB

Contents

require "json"
require "ffi-rzmq"

class PigatoClient

  def initialize broker
    @broker = broker
    @context = ZMQ::Context.new(1)
    @client = nil
    @poller = ZMQ::Poller.new

    reconnect_to_broker
  end

  def send service, request, timeout = 2500
    request = [request.to_json]

    rid = 'RID' + (rand() * 1000000).to_s
    request = [Pigato::C_CLIENT, Pigato::W_REQUEST, service, rid].concat(request)
    @client.send_strings request

    res = Array.new
    res << rid

    data = Array.new
    while 1 do
      chunk = _recv(timeout)
      data << chunk[4]
      break if chunk[0] == Pigato::W_REPLY
    end

    res << data
    res
  end

  def _recv timeout
    items = @poller.poll(timeout)
    if items
      messages = []
      @client.recv_strings messages

      # header
      if messages.shift != Pigato::C_CLIENT
        raise RuntimeError, "Not a valid Pigato message"
      end

      return messages
    end

    nil
  end

  def reconnect_to_broker
    if @client
      @poller.deregister @client, ZMQ::DEALER
    end

    @client = @context.socket ZMQ::DEALER
    @client.setsockopt ZMQ::LINGER, 0
    @client.setsockopt ZMQ::IDENTITY, "C" + (rand() * 10).to_s
    @client.connect @broker
    @poller.register @client, ZMQ::POLLIN
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
pigato-0.1.2 lib/pigato/client.rb
pigato-0.1.1 lib/pigato/client.rb