lib/pigato/client.rb in pigato-0.2.13 vs lib/pigato/client.rb in pigato-0.2.14
- old
+ new
@@ -1,17 +1,13 @@
require 'thread'
class Pigato::Client
def initialize broker, conf = {}
- @mtx = Mutex.new
@broker = broker
- @ctx = ZMQ::Context.new
- @ctx.linger = 0
+ @ctxs = {}
- @sockets = {}
-
@conf = {
:autostart => false,
:timeout => 2500
}
@@ -26,13 +22,14 @@
tid = "#" + Process.pid.to_s + "|" + Thread.current.object_id.to_s
tid
end
def request service, request, opts = {}
- socket = @sockets[getid()]
- return nil if socket == nil;
-
+ return nil if @ctxs[getid()] == nil
+
+ socket = @ctxs[getid()]['socket']
+
request = [Oj.dump(request), Oj.dump(opts)]
rid = SecureRandom.uuid
request = [Pigato::C_CLIENT, Pigato::W_REQUEST, service, rid].concat(request)
msg = ZMQ::Message.new
@@ -51,11 +48,11 @@
return res[0] if res.length == 1
res
end
def _recv rid
- socket = @sockets[getid()]
+ socket = @ctxs[getid()]['socket']
socket.rcvtimeo = @conf[:timeout]
data = []
d1 = Time.now
msg = socket.recv_message()
while 1 do
@@ -74,20 +71,22 @@
reconnect_to_broker
end
def stop
tid = getid()
- socket = @sockets[tid]
- if socket
- socket.close
- @sockets.delete(tid)
+ if @ctxs[tid]
+ @ctxs[tid]['socket'].close
+ @ctxs[tid]['ctx'].destroy
+ @ctxs.delete(tid)
end
end
def reconnect_to_broker
stop
- socket = @ctx.socket ZMQ::DEALER
+ ctx = ZMQ::Context.new
+ ctx.linger = 0
+ socket = ctx.socket ZMQ::DEALER
socket.identity = SecureRandom.uuid
socket.connect @broker
- @sockets[getid()] = socket
+ @ctxs[getid()] = { 'socket' => socket, 'ctx' => ctx }
end
end