lib/pigato/worker.rb in pigato-0.4.2 vs lib/pigato/worker.rb in pigato-0.4.3
- old
+ new
@@ -20,83 +20,99 @@
@liveness = 0
@heartbeat_at = 0
@reply_to = nil
@reply_rid = nil
@reply_service = nil
-
+
init
if @conf[:autostart]
start
end
+
+ Thread.new do
+ client = Pigato::Client.new(broker, { :autostart => true })
+ loop do
+ @@mtx.lock
+ begin
+ if Time.now > @@global_heartbeat_at
+ @@sockets_ids.each do |iid, sid|
+ request = [Pigato::C_CLIENT, Pigato::W_HEARTBEAT, "worker", sid]
+ msg = ZMQ::Message.new
+ request.reverse.each{|p| msg.push(ZMQ::Frame(p))}
+ client.send msg
+ end
+ @@global_heartbeat_at = Time.now + 2
+ end
+ rescue => e
+ puts e
+ end
+ @@mtx.unlock
+ sleep 2
+ end
+ end
end
def reply reply
reply = [@reply_to, '', @reply_rid, '0'].concat([Oj.dump(reply)])
send Pigato::W_REPLY, reply
end
-
+
def recv
+ val = nil
- loop do
- @reply_rid = nil
- @reply_to = nil
- @reply_service = nil
+ @reply_rid = nil
+ @reply_to = nil
+ @reply_service = nil
- iid = get_iid
-
- start if @@sockets[iid] == nil && @conf[:autostart]
+ iid = get_iid
- socket = get_socket
- return nil if socket.nil?
+ start if @@sockets[iid] == nil && @conf[:autostart]
- socket.rcvtimeo = @conf[:timeout]
+ socket = get_socket
+ return nil if socket.nil?
- msg = socket.recv_message
+ socket.rcvtimeo = @conf[:timeout]
- return nil if msg.nil?
+ msg = socket.recv_message
- if msg && msg.size
- @liveness = HEARTBEAT_LIVENESS
+ if msg && msg.size
+ @liveness = HEARTBEAT_LIVENESS
- header = msg.pop.data
- if header != Pigato::W_WORKER
- puts "E: Header is not Pigato::WORKER"
- next
- end
+ header = msg.pop.data
+ if header != Pigato::W_WORKER
+ puts "E: Header is not Pigato::WORKER"
+ return nil
+ end
- command = msg.pop.data
+ command = msg.pop.data
- case command
- when Pigato::W_REQUEST
- # We should pop and save as many addresses as there are
- # up to a null part, but for now, just save one...
- @reply_to = msg.pop.data
- @reply_service = msg.pop.data
- msg.pop # empty
- @reply_rid = msg.pop.data
- val = Oj.load(msg.pop.data) # We have a request to process
- return val
- when Pigato::W_HEARTBEAT
- # do nothing
- when Pigato::W_DISCONNECT
- start
- else
- end
+ case command
+ when Pigato::W_REQUEST
+ @reply_to = msg.pop.data
+ @reply_service = msg.pop.data
+ msg.pop # empty
+ @reply_rid = msg.pop.data
+ val = Oj.load(msg.pop.data)
+ when Pigato::W_HEARTBEAT
+ when Pigato::W_DISCONNECT
+ start
else
- @liveness -= 1
- if @liveness == 0
- sleep 0.001 * @conf[:reconnect]
- start
- end
end
-
- if Time.now > @heartbeat_at
- send Pigato::W_HEARTBEAT
- @heartbeat_at = Time.now + 0.001 * @conf[:heartbeat]
+ else
+ @liveness -= 1
+ if @liveness == 0
+ sleep 0.001 * @conf[:reconnect]
+ start
end
+ end
+ if Time.now > @heartbeat_at
+ send Pigato::W_HEARTBEAT
+ @heartbeat_at = Time.now + 0.001 * @conf[:heartbeat]
end
+
+ val
end
def start
stop
sock_create