lib/pigato/client.rb in pigato-0.2.1 vs lib/pigato/client.rb in pigato-0.2.2

- old
+ new

@@ -1,15 +1,11 @@ -require "oj" -require "securerandom" - class Pigato::Client def initialize broker, conf = {} @broker = broker @context = ZMQ::Context.new(1) @socket = nil - @poller = ZMQ::Poller.new @conf = { :autostart => false, :timeout => 2500 } @@ -26,11 +22,13 @@ request = [Oj.dump(request)] rid = SecureRandom.uuid request = [Pigato::C_CLIENT, Pigato::W_REQUEST, service, rid].concat(request) - @socket.send_strings request + msg = ZMQ::Message.new + request.reverse.each{|p| msg.push(ZMQ::Frame(p))} + @socket.send_message msg res = [] while 1 do chunk = _recv(rid, timeout) break if chunk == nil @@ -41,31 +39,24 @@ return res[0] if res.length === 1 res end def _recv rid, timeout = @timeout - items = @poller.poll(timeout) - if items - msg = [] - d1 = Time.now - while 1 do - @socket.recv_strings(msg, ZMQ::DONTWAIT) - msg = [] if msg.length < 5 || msg[3] != rid - break if msg.length > 0 || ((Time.now - d1) * 1000 > timeout) - sleep(1.0 / 50.0) - end + @socket.rcvtimeo = timeout; + data = [] + d1 = Time.now + msg = @socket.recv_message() + while 1 do + break if !msg || msg.size == 0 + data << msg.pop.data + end + data = [] if data[3] != rid - return nil if msg.length == 0 + return nil if data.length == 0 - # header - if msg.shift != Pigato::C_CLIENT - raise RuntimeError, "Not a valid Pigato message" - end - - return msg - end - nil + data.shift + return data end def start reconnect_to_broker end @@ -74,15 +65,14 @@ $socket.close end def reconnect_to_broker if @socket - @poller.deregister @socket, ZMQ::DEALER + @socket.close end @socket = @context.socket ZMQ::DEALER - @socket.setsockopt ZMQ::LINGER, 0 - @socket.setsockopt ZMQ::IDENTITY, SecureRandom.uuid + @context.linger = 0 + @socket.identity = SecureRandom.uuid @socket.connect @broker - @poller.register @socket, ZMQ::POLLIN end end