lib/pigato/client.rb in pigato-0.2.27 vs lib/pigato/client.rb in pigato-0.3.0

- old
+ new

@@ -1,41 +1,35 @@ -require 'thread' +require "#{File.dirname(__FILE__)}/base.rb" -class Pigato::Client +class Pigato::Client < Pigato::Base + @@mtx = Mutex.new + @@ctxs = {} + @@sockets = {} + def initialize broker, conf = {} @broker = broker - @ctxs = {} - @sockets = {} @conf = { :autostart => false, :timeout => 2500 } @conf.merge!(conf) + init + if @conf[:autostart] start end end - def get_proc_id - pid = "#" + Process.pid.to_s - pid - end - - def get_thread_id - tid = "#" + get_proc_id() + "#" + Thread.current.object_id.to_s - tid - end - def request service, request, opts = {} - return nil if @sockets[get_thread_id()] == nil + iid = get_iid + return nil if @@sockets[iid] == nil - socket = @sockets[get_thread_id()] - + socket = @@sockets[iid] request = [Oj.dump(request), Oj.dump(opts)] rid = SecureRandom.uuid request = [Pigato::C_CLIENT, Pigato::W_REQUEST, service, rid].concat(request) msg = ZMQ::Message.new @@ -54,12 +48,14 @@ return res[0] if res.length == 1 res end def _recv rid - socket = @sockets[get_thread_id()] + iid = get_iid + socket = @@sockets[iid] socket.rcvtimeo = @conf[:timeout] + data = [] d1 = Time.now msg = socket.recv_message() while 1 do break if !msg || msg.size == 0 @@ -72,37 +68,17 @@ data.shift return data end def start - reconnect_to_broker - end - - def stop - tid = get_thread_id() - if @sockets[tid] - @sockets[tid].close - @sockets.delete(tid) - end - - pid = get_proc_id() - if @ctxs[pid] - @ctxs[pid].destroy - @ctxs.delete(pid) - end - end - - def reconnect_to_broker stop - - ctx = ZMQ::Context.new - ctx.linger = 0 - @ctxs[get_proc_id()] = ctx - - socket = ctx.socket ZMQ::DEALER - socket.identity = SecureRandom.uuid - socket.connect @broker - @sockets[get_thread_id()] = socket + sock_create + super rescue ZMQ::Error => e puts e + end + + def stop + sock_close + super end end