lib/pigato/client.rb in pigato-0.2.1 vs lib/pigato/client.rb in pigato-0.2.2
- old
+ new
@@ -1,15 +1,11 @@
-require "oj"
-require "securerandom"
-
class Pigato::Client
def initialize broker, conf = {}
@broker = broker
@context = ZMQ::Context.new(1)
@socket = nil
- @poller = ZMQ::Poller.new
@conf = {
:autostart => false,
:timeout => 2500
}
@@ -26,11 +22,13 @@
request = [Oj.dump(request)]
rid = SecureRandom.uuid
request = [Pigato::C_CLIENT, Pigato::W_REQUEST, service, rid].concat(request)
- @socket.send_strings request
+ msg = ZMQ::Message.new
+ request.reverse.each{|p| msg.push(ZMQ::Frame(p))}
+ @socket.send_message msg
res = []
while 1 do
chunk = _recv(rid, timeout)
break if chunk == nil
@@ -41,31 +39,24 @@
return res[0] if res.length === 1
res
end
def _recv rid, timeout = @timeout
- items = @poller.poll(timeout)
- if items
- msg = []
- d1 = Time.now
- while 1 do
- @socket.recv_strings(msg, ZMQ::DONTWAIT)
- msg = [] if msg.length < 5 || msg[3] != rid
- break if msg.length > 0 || ((Time.now - d1) * 1000 > timeout)
- sleep(1.0 / 50.0)
- end
+ @socket.rcvtimeo = timeout;
+ data = []
+ d1 = Time.now
+ msg = @socket.recv_message()
+ while 1 do
+ break if !msg || msg.size == 0
+ data << msg.pop.data
+ end
+ data = [] if data[3] != rid
- return nil if msg.length == 0
+ return nil if data.length == 0
- # header
- if msg.shift != Pigato::C_CLIENT
- raise RuntimeError, "Not a valid Pigato message"
- end
-
- return msg
- end
- nil
+ data.shift
+ return data
end
def start
reconnect_to_broker
end
@@ -74,15 +65,14 @@
$socket.close
end
def reconnect_to_broker
if @socket
- @poller.deregister @socket, ZMQ::DEALER
+ @socket.close
end
@socket = @context.socket ZMQ::DEALER
- @socket.setsockopt ZMQ::LINGER, 0
- @socket.setsockopt ZMQ::IDENTITY, SecureRandom.uuid
+ @context.linger = 0
+ @socket.identity = SecureRandom.uuid
@socket.connect @broker
- @poller.register @socket, ZMQ::POLLIN
end
end