Sha256: 235cfc5d46b251bc892ec7779efd8b13aa1b0392a3acea27a1404612a80e1ec8

Contents?: true

Size: 1.55 KB

Versions: 1

Compression:

Stored size: 1.55 KB

Contents

module Pigato
  class Client
    include MDP

    attr_accessor :timeout

    def initialize broker
      @broker = broker
      @context = ZMQ::Context.new(1)
      @client = nil
      @poller = ZMQ::Poller.new
      @timeout = 2500

      reconnect_to_broker
    end

    def send service, request, timeout = 2500
      request = [request.to_json]

      rid = 'RID' + (rand() * 1000000).to_s
      # Prefix request with protocol frames
      # Frame 0: empty (REQ emulation)
      # Frame 1: "MDPCxy" (six bytes, MDP/Client x.y)
      # Frame 2: Service name (printable string)
      request = [MDP::C_CLIENT, MDP::W_REQUEST, service, rid].concat(request)
      @client.send_strings request

      res = Array.new
      res << rid

      data = Array.new
      while 1 do
        chunk = _recv(timeout)
        data << chunk[4]
        break if chunk[0] == MDP::W_REPLY
      end

      res << data
      res
    end

    def _recv timeout
      items = @poller.poll(timeout)
      if items
        messages = []
        @client.recv_strings messages

        # header
        if messages.shift != MDP::C_CLIENT
          raise RuntimeError, "Not a valid MDP message"
        end

        return messages
      end

      nil
    end

    def reconnect_to_broker
      if @client
        @poller.deregister @client, ZMQ::DEALER
      end

      @client = @context.socket ZMQ::DEALER
      @client.setsockopt ZMQ::LINGER, 0
      @client.setsockopt ZMQ::IDENTITY, "C" + (rand() * 10).to_s
      @client.connect @broker
      @poller.register @client, ZMQ::POLLIN
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
pigato-0.1.0 lib/pigato/client.rb