lib/pigato/worker.rb in pigato-0.1.3 vs lib/pigato/worker.rb in pigato-0.1.5
- old
+ new
@@ -1,6 +1,6 @@
-require "json"
+require "oj"
require "ffi-rzmq"
require "securerandom"
class PigatoWorker
HEARTBEAT_LIVENESS = 3 # 3-5 is reasonable
@@ -23,11 +23,11 @@
reconnect_to_broker
end
def reply reply
- reply = [@reply_to, '', @reply_rid, '0'].concat([reply.to_json])
+ reply = [@reply_to, '', @reply_rid, '0'].concat([Oj.dump(reply)])
send_to_broker Pigato::W_REPLY, reply, nil
end
def recv reply
loop do
@@ -35,31 +35,32 @@
@reply_to = nil
@reply_service = nil
items = @poller.poll(@timeout)
if items
- messages = []
- @worker.recv_strings messages
+ msg = []
+ @worker.recv_strings msg
@liveness = HEARTBEAT_LIVENESS
- header = messages.shift
+ header = msg.shift
if header != Pigato::W_WORKER
puts "E: Header is not Pigato::WORKER"
+ next
end
- command = messages.shift
+ command = msg.shift
case command
when Pigato::W_REQUEST
# We should pop and save as many addresses as there are
# up to a null part, but for now, just save one...
- puts "REQUEST"
- @reply_to = messages.shift
- @reply_service = messages.shift
- messages.shift # empty
- @reply_rid = messages.shift
- return messages[0] # We have a request to process
+ @reply_to = msg.shift
+ @reply_service = msg.shift
+ msg.shift # empty
+ @reply_rid = msg.shift
+ val = Oj.load(msg[0]) # We have a request to process
+ return val
when Pigato::W_HEARTBEAT
# do nothing
when Pigato::W_DISCONNECT
reconnect_to_broker
else