lib/pigato/client.rb in pigato-0.2.14 vs lib/pigato/client.rb in pigato-0.2.15
- old
+ new
@@ -3,10 +3,11 @@
class Pigato::Client
def initialize broker, conf = {}
@broker = broker
@ctxs = {}
+ @sockets = {}
@conf = {
:autostart => false,
:timeout => 2500
}
@@ -16,19 +17,24 @@
if @conf[:autostart]
start
end
end
- def getid
- tid = "#" + Process.pid.to_s + "|" + Thread.current.object_id.to_s
+ def get_proc_id
+ pid = "#" + Process.pid.to_s
+ pid
+ end
+
+ def get_thread_id
+ tid = get_proc_id() + "|" + Thread.current.object_id.to_s
tid
end
def request service, request, opts = {}
- return nil if @ctxs[getid()] == nil
+ return nil if @sockets[get_thread_id()] == nil
- socket = @ctxs[getid()]['socket']
+ socket = @sockets[get_thread_id()]
request = [Oj.dump(request), Oj.dump(opts)]
rid = SecureRandom.uuid
request = [Pigato::C_CLIENT, Pigato::W_REQUEST, service, rid].concat(request)
@@ -48,11 +54,11 @@
return res[0] if res.length == 1
res
end
def _recv rid
- socket = @ctxs[getid()]['socket']
+ socket = @sockets[get_thread_id()]
socket.rcvtimeo = @conf[:timeout]
data = []
d1 = Time.now
msg = socket.recv_message()
while 1 do
@@ -70,23 +76,23 @@
def start
reconnect_to_broker
end
def stop
- tid = getid()
- if @ctxs[tid]
- @ctxs[tid]['socket'].close
- @ctxs[tid]['ctx'].destroy
- @ctxs.delete(tid)
+ tid = get_thread_id()
+ if @sockets[tid]
+ @sockets[tid].close
end
end
def reconnect_to_broker
stop
- ctx = ZMQ::Context.new
+ ctx = @ctxs[get_proc_id()]
+ ctx = ZMQ::Context.new if ctx == nil
ctx.linger = 0
socket = ctx.socket ZMQ::DEALER
socket.identity = SecureRandom.uuid
socket.connect @broker
- @ctxs[getid()] = { 'socket' => socket, 'ctx' => ctx }
+ @sockets[get_thread_id()] = socket
+ @ctxs[get_proc_id()] = ctx
end
end