lib/pigato/client.rb in pigato-0.2.11 vs lib/pigato/client.rb in pigato-0.2.12
- old
+ new
@@ -1,11 +1,17 @@
+require 'thread'
+
class Pigato::Client
def initialize broker, conf = {}
+ @mtx = Mutex.new
@broker = broker
- @socket = nil
+ @ctx = ZMQ::Context.new
+ @ctx.linger = 0
+ @sockets = {}
+
@conf = {
:autostart => false,
:timeout => 2500
}
@@ -14,20 +20,24 @@
if @conf[:autostart]
start
end
end
- def request service, request, opts = {}
- return nil if @socket == nil;
+
+ def request service, request, opts = {}
+ start if @sockets[Thread.current.object_id] == nil
+ socket = @sockets[Thread.current.object_id]
+
+ return nil if socket == nil;
request = [Oj.dump(request), Oj.dump(opts)]
rid = SecureRandom.uuid
request = [Pigato::C_CLIENT, Pigato::W_REQUEST, service, rid].concat(request)
msg = ZMQ::Message.new
request.reverse.each{|p| msg.push(ZMQ::Frame(p))}
- @socket.send_message msg
+ socket.send_message msg
res = []
while 1 do
chunk = _recv rid
break if chunk == nil
@@ -39,14 +49,15 @@
return res[0] if res.length == 1
res
end
def _recv rid
- @socket.rcvtimeo = @conf[:timeout]
+ socket = @sockets[Thread.current.object_id]
+ socket.rcvtimeo = @conf[:timeout]
data = []
d1 = Time.now
- msg = @socket.recv_message()
+ msg = socket.recv_message()
while 1 do
break if !msg || msg.size == 0
data << msg.pop.data
end
data = [] if data[3] != rid
@@ -60,22 +71,20 @@
def start
reconnect_to_broker
end
def stop
- if @socket
- @socket.close
+ socket = @sockets[Thread.current.object_id]
+ if socket
+ socket.close
+ @sockets.delete(Thread.current.object_id)
end
- if @ctx
- @ctx.destroy
- end
end
def reconnect_to_broker
stop
- @ctx = ZMQ::Context.new
- @socket = @ctx.socket ZMQ::DEALER
- @ctx.linger = 0
- @socket.identity = SecureRandom.uuid
- @socket.connect @broker
+ socket = @ctx.socket ZMQ::DEALER
+ socket.identity = SecureRandom.uuid
+ socket.connect @broker
+ @sockets[Thread.current.object_id] = socket
end
end