lib/pigato/client.rb in pigato-0.2.11 vs lib/pigato/client.rb in pigato-0.2.12

- old
+ new

@@ -1,11 +1,17 @@ +require 'thread' + class Pigato::Client def initialize broker, conf = {} + @mtx = Mutex.new @broker = broker - @socket = nil + @ctx = ZMQ::Context.new + @ctx.linger = 0 + @sockets = {} + @conf = { :autostart => false, :timeout => 2500 } @@ -14,20 +20,24 @@ if @conf[:autostart] start end end - def request service, request, opts = {} - return nil if @socket == nil; + + def request service, request, opts = {} + start if @sockets[Thread.current.object_id] == nil + socket = @sockets[Thread.current.object_id] + + return nil if socket == nil; request = [Oj.dump(request), Oj.dump(opts)] rid = SecureRandom.uuid request = [Pigato::C_CLIENT, Pigato::W_REQUEST, service, rid].concat(request) msg = ZMQ::Message.new request.reverse.each{|p| msg.push(ZMQ::Frame(p))} - @socket.send_message msg + socket.send_message msg res = [] while 1 do chunk = _recv rid break if chunk == nil @@ -39,14 +49,15 @@ return res[0] if res.length == 1 res end def _recv rid - @socket.rcvtimeo = @conf[:timeout] + socket = @sockets[Thread.current.object_id] + socket.rcvtimeo = @conf[:timeout] data = [] d1 = Time.now - msg = @socket.recv_message() + msg = socket.recv_message() while 1 do break if !msg || msg.size == 0 data << msg.pop.data end data = [] if data[3] != rid @@ -60,22 +71,20 @@ def start reconnect_to_broker end def stop - if @socket - @socket.close + socket = @sockets[Thread.current.object_id] + if socket + socket.close + @sockets.delete(Thread.current.object_id) end - if @ctx - @ctx.destroy - end end def reconnect_to_broker stop - @ctx = ZMQ::Context.new - @socket = @ctx.socket ZMQ::DEALER - @ctx.linger = 0 - @socket.identity = SecureRandom.uuid - @socket.connect @broker + socket = @ctx.socket ZMQ::DEALER + socket.identity = SecureRandom.uuid + socket.connect @broker + @sockets[Thread.current.object_id] = socket end end