lib/pigato/worker.rb in pigato-0.4.2 vs lib/pigato/worker.rb in pigato-0.4.3

- old
+ new

@@ -20,83 +20,99 @@ @liveness = 0 @heartbeat_at = 0 @reply_to = nil @reply_rid = nil @reply_service = nil - + init if @conf[:autostart] start end + + Thread.new do + client = Pigato::Client.new(broker, { :autostart => true }) + loop do + @@mtx.lock + begin + if Time.now > @@global_heartbeat_at + @@sockets_ids.each do |iid, sid| + request = [Pigato::C_CLIENT, Pigato::W_HEARTBEAT, "worker", sid] + msg = ZMQ::Message.new + request.reverse.each{|p| msg.push(ZMQ::Frame(p))} + client.send msg + end + @@global_heartbeat_at = Time.now + 2 + end + rescue => e + puts e + end + @@mtx.unlock + sleep 2 + end + end end def reply reply reply = [@reply_to, '', @reply_rid, '0'].concat([Oj.dump(reply)]) send Pigato::W_REPLY, reply end - + def recv + val = nil - loop do - @reply_rid = nil - @reply_to = nil - @reply_service = nil + @reply_rid = nil + @reply_to = nil + @reply_service = nil - iid = get_iid - - start if @@sockets[iid] == nil && @conf[:autostart] + iid = get_iid - socket = get_socket - return nil if socket.nil? + start if @@sockets[iid] == nil && @conf[:autostart] - socket.rcvtimeo = @conf[:timeout] + socket = get_socket + return nil if socket.nil? - msg = socket.recv_message + socket.rcvtimeo = @conf[:timeout] - return nil if msg.nil? + msg = socket.recv_message - if msg && msg.size - @liveness = HEARTBEAT_LIVENESS + if msg && msg.size + @liveness = HEARTBEAT_LIVENESS - header = msg.pop.data - if header != Pigato::W_WORKER - puts "E: Header is not Pigato::WORKER" - next - end + header = msg.pop.data + if header != Pigato::W_WORKER + puts "E: Header is not Pigato::WORKER" + return nil + end - command = msg.pop.data + command = msg.pop.data - case command - when Pigato::W_REQUEST - # We should pop and save as many addresses as there are - # up to a null part, but for now, just save one... - @reply_to = msg.pop.data - @reply_service = msg.pop.data - msg.pop # empty - @reply_rid = msg.pop.data - val = Oj.load(msg.pop.data) # We have a request to process - return val - when Pigato::W_HEARTBEAT - # do nothing - when Pigato::W_DISCONNECT - start - else - end + case command + when Pigato::W_REQUEST + @reply_to = msg.pop.data + @reply_service = msg.pop.data + msg.pop # empty + @reply_rid = msg.pop.data + val = Oj.load(msg.pop.data) + when Pigato::W_HEARTBEAT + when Pigato::W_DISCONNECT + start else - @liveness -= 1 - if @liveness == 0 - sleep 0.001 * @conf[:reconnect] - start - end end - - if Time.now > @heartbeat_at - send Pigato::W_HEARTBEAT - @heartbeat_at = Time.now + 0.001 * @conf[:heartbeat] + else + @liveness -= 1 + if @liveness == 0 + sleep 0.001 * @conf[:reconnect] + start end + end + if Time.now > @heartbeat_at + send Pigato::W_HEARTBEAT + @heartbeat_at = Time.now + 0.001 * @conf[:heartbeat] end + + val end def start stop sock_create