lib/pigato/client.rb in pigato-0.2.13 vs lib/pigato/client.rb in pigato-0.2.14

- old
+ new

@@ -1,17 +1,13 @@ require 'thread' class Pigato::Client def initialize broker, conf = {} - @mtx = Mutex.new @broker = broker - @ctx = ZMQ::Context.new - @ctx.linger = 0 + @ctxs = {} - @sockets = {} - @conf = { :autostart => false, :timeout => 2500 } @@ -26,13 +22,14 @@ tid = "#" + Process.pid.to_s + "|" + Thread.current.object_id.to_s tid end def request service, request, opts = {} - socket = @sockets[getid()] - return nil if socket == nil; - + return nil if @ctxs[getid()] == nil + + socket = @ctxs[getid()]['socket'] + request = [Oj.dump(request), Oj.dump(opts)] rid = SecureRandom.uuid request = [Pigato::C_CLIENT, Pigato::W_REQUEST, service, rid].concat(request) msg = ZMQ::Message.new @@ -51,11 +48,11 @@ return res[0] if res.length == 1 res end def _recv rid - socket = @sockets[getid()] + socket = @ctxs[getid()]['socket'] socket.rcvtimeo = @conf[:timeout] data = [] d1 = Time.now msg = socket.recv_message() while 1 do @@ -74,20 +71,22 @@ reconnect_to_broker end def stop tid = getid() - socket = @sockets[tid] - if socket - socket.close - @sockets.delete(tid) + if @ctxs[tid] + @ctxs[tid]['socket'].close + @ctxs[tid]['ctx'].destroy + @ctxs.delete(tid) end end def reconnect_to_broker stop - socket = @ctx.socket ZMQ::DEALER + ctx = ZMQ::Context.new + ctx.linger = 0 + socket = ctx.socket ZMQ::DEALER socket.identity = SecureRandom.uuid socket.connect @broker - @sockets[getid()] = socket + @ctxs[getid()] = { 'socket' => socket, 'ctx' => ctx } end end