lib/pigato/client.rb in pigato-0.2.14 vs lib/pigato/client.rb in pigato-0.2.15

- old
+ new

@@ -3,10 +3,11 @@ class Pigato::Client def initialize broker, conf = {} @broker = broker @ctxs = {} + @sockets = {} @conf = { :autostart => false, :timeout => 2500 } @@ -16,19 +17,24 @@ if @conf[:autostart] start end end - def getid - tid = "#" + Process.pid.to_s + "|" + Thread.current.object_id.to_s + def get_proc_id + pid = "#" + Process.pid.to_s + pid + end + + def get_thread_id + tid = get_proc_id() + "|" + Thread.current.object_id.to_s tid end def request service, request, opts = {} - return nil if @ctxs[getid()] == nil + return nil if @sockets[get_thread_id()] == nil - socket = @ctxs[getid()]['socket'] + socket = @sockets[get_thread_id()] request = [Oj.dump(request), Oj.dump(opts)] rid = SecureRandom.uuid request = [Pigato::C_CLIENT, Pigato::W_REQUEST, service, rid].concat(request) @@ -48,11 +54,11 @@ return res[0] if res.length == 1 res end def _recv rid - socket = @ctxs[getid()]['socket'] + socket = @sockets[get_thread_id()] socket.rcvtimeo = @conf[:timeout] data = [] d1 = Time.now msg = socket.recv_message() while 1 do @@ -70,23 +76,23 @@ def start reconnect_to_broker end def stop - tid = getid() - if @ctxs[tid] - @ctxs[tid]['socket'].close - @ctxs[tid]['ctx'].destroy - @ctxs.delete(tid) + tid = get_thread_id() + if @sockets[tid] + @sockets[tid].close end end def reconnect_to_broker stop - ctx = ZMQ::Context.new + ctx = @ctxs[get_proc_id()] + ctx = ZMQ::Context.new if ctx == nil ctx.linger = 0 socket = ctx.socket ZMQ::DEALER socket.identity = SecureRandom.uuid socket.connect @broker - @ctxs[getid()] = { 'socket' => socket, 'ctx' => ctx } + @sockets[get_thread_id()] = socket + @ctxs[get_proc_id()] = ctx end end