Sha256: 5a4a0b65c5fc7e1eff0d5dab963e5238784d0a4a2f7ee02b493cb7876776266d
Contents?: true
Size: 1.44 KB
Versions: 1
Compression:
Stored size: 1.44 KB
Contents
class Pigato::Client def initialize broker, conf = {} @broker = broker @context = ZMQ::Context.new(1) @socket = nil @conf = { :autostart => false, :timeout => 2500 } @conf.merge!(conf) if @conf[:autostart] start end end def request service, request, timeout = @conf[:timeout] return nil if @socket == nil; request = [Oj.dump(request)] rid = SecureRandom.uuid request = [Pigato::C_CLIENT, Pigato::W_REQUEST, service, rid].concat(request) msg = ZMQ::Message.new request.reverse.each{|p| msg.push(ZMQ::Frame(p))} @socket.send_message msg res = [] while 1 do chunk = _recv(rid, timeout) break if chunk == nil res << Oj.load(chunk[4]) break if chunk[0] == Pigato::W_REPLY end return res[0] if res.length == 1 res end def _recv rid, timeout = @timeout @socket.rcvtimeo = timeout; data = [] d1 = Time.now msg = @socket.recv_message() while 1 do break if !msg || msg.size == 0 data << msg.pop.data end data = [] if data[3] != rid return nil if data.length == 0 data.shift return data end def start reconnect_to_broker end def stop $socket.close end def reconnect_to_broker if @socket @socket.close end @socket = @context.socket ZMQ::DEALER @context.linger = 0 @socket.identity = SecureRandom.uuid @socket.connect @broker end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
pigato-0.2.3 | lib/pigato/client.rb |