lib/pigato/worker.rb in pigato-0.1.3 vs lib/pigato/worker.rb in pigato-0.1.5

- old
+ new

@@ -1,6 +1,6 @@ -require "json" +require "oj" require "ffi-rzmq" require "securerandom" class PigatoWorker HEARTBEAT_LIVENESS = 3 # 3-5 is reasonable @@ -23,11 +23,11 @@ reconnect_to_broker end def reply reply - reply = [@reply_to, '', @reply_rid, '0'].concat([reply.to_json]) + reply = [@reply_to, '', @reply_rid, '0'].concat([Oj.dump(reply)]) send_to_broker Pigato::W_REPLY, reply, nil end def recv reply loop do @@ -35,31 +35,32 @@ @reply_to = nil @reply_service = nil items = @poller.poll(@timeout) if items - messages = [] - @worker.recv_strings messages + msg = [] + @worker.recv_strings msg @liveness = HEARTBEAT_LIVENESS - header = messages.shift + header = msg.shift if header != Pigato::W_WORKER puts "E: Header is not Pigato::WORKER" + next end - command = messages.shift + command = msg.shift case command when Pigato::W_REQUEST # We should pop and save as many addresses as there are # up to a null part, but for now, just save one... - puts "REQUEST" - @reply_to = messages.shift - @reply_service = messages.shift - messages.shift # empty - @reply_rid = messages.shift - return messages[0] # We have a request to process + @reply_to = msg.shift + @reply_service = msg.shift + msg.shift # empty + @reply_rid = msg.shift + val = Oj.load(msg[0]) # We have a request to process + return val when Pigato::W_HEARTBEAT # do nothing when Pigato::W_DISCONNECT reconnect_to_broker else