lib/pigato/client.rb in pigato-0.2.27 vs lib/pigato/client.rb in pigato-0.3.0
- old
+ new
@@ -1,41 +1,35 @@
-require 'thread'
+require "#{File.dirname(__FILE__)}/base.rb"
-class Pigato::Client
+class Pigato::Client < Pigato::Base
+ @@mtx = Mutex.new
+ @@ctxs = {}
+ @@sockets = {}
+
def initialize broker, conf = {}
@broker = broker
- @ctxs = {}
- @sockets = {}
@conf = {
:autostart => false,
:timeout => 2500
}
@conf.merge!(conf)
+ init
+
if @conf[:autostart]
start
end
end
- def get_proc_id
- pid = "#" + Process.pid.to_s
- pid
- end
-
- def get_thread_id
- tid = "#" + get_proc_id() + "#" + Thread.current.object_id.to_s
- tid
- end
-
def request service, request, opts = {}
- return nil if @sockets[get_thread_id()] == nil
+ iid = get_iid
+ return nil if @@sockets[iid] == nil
- socket = @sockets[get_thread_id()]
-
+ socket = @@sockets[iid]
request = [Oj.dump(request), Oj.dump(opts)]
rid = SecureRandom.uuid
request = [Pigato::C_CLIENT, Pigato::W_REQUEST, service, rid].concat(request)
msg = ZMQ::Message.new
@@ -54,12 +48,14 @@
return res[0] if res.length == 1
res
end
def _recv rid
- socket = @sockets[get_thread_id()]
+ iid = get_iid
+ socket = @@sockets[iid]
socket.rcvtimeo = @conf[:timeout]
+
data = []
d1 = Time.now
msg = socket.recv_message()
while 1 do
break if !msg || msg.size == 0
@@ -72,37 +68,17 @@
data.shift
return data
end
def start
- reconnect_to_broker
- end
-
- def stop
- tid = get_thread_id()
- if @sockets[tid]
- @sockets[tid].close
- @sockets.delete(tid)
- end
-
- pid = get_proc_id()
- if @ctxs[pid]
- @ctxs[pid].destroy
- @ctxs.delete(pid)
- end
- end
-
- def reconnect_to_broker
stop
-
- ctx = ZMQ::Context.new
- ctx.linger = 0
- @ctxs[get_proc_id()] = ctx
-
- socket = ctx.socket ZMQ::DEALER
- socket.identity = SecureRandom.uuid
- socket.connect @broker
- @sockets[get_thread_id()] = socket
+ sock_create
+ super
rescue ZMQ::Error => e
puts e
+ end
+
+ def stop
+ sock_close
+ super
end
end