lib/pigato/worker.rb in pigato-0.2.1 vs lib/pigato/worker.rb in pigato-0.2.2
- old
+ new
@@ -1,18 +1,13 @@
-require "oj"
-require "ffi-rzmq"
-require "securerandom"
-
class Pigato::Worker
HEARTBEAT_LIVENESS = 3 # 3-5 is reasonable
def initialize broker, service
@broker = broker
@service = service
@context = ZMQ::Context.new(1)
- @poller = ZMQ::Poller.new
@socket = nil # Socket to broker
@heartbeat_at = 0 # When to send HEARTBEAT (relative to time.time(), so in seconds)
@liveness = 0 # How many attempts left
@timeout = 2500
@heartbeat = 2500 # Heartbeat delay, msecs
@@ -25,43 +20,41 @@
reconnect_to_broker
end
def reply reply
reply = [@reply_to, '', @reply_rid, '0'].concat([Oj.dump(reply)])
- send_to_broker Pigato::W_REPLY, reply, nil
+ send_to_broker Pigato::W_REPLY, reply
end
def recv reply
loop do
@reply_rid = nil
@reply_to = nil
@reply_service = nil
- items = @poller.poll(@timeout)
- if items
- msg = []
- @socket.recv_strings msg
+ msg = @socket.recv_message
+ if msg && msg.size
@liveness = HEARTBEAT_LIVENESS
- header = msg.shift
+ header = msg.pop.data
if header != Pigato::W_WORKER
puts "E: Header is not Pigato::WORKER"
next
end
- command = msg.shift
+ command = msg.pop.data
case command
when Pigato::W_REQUEST
# We should pop and save as many addresses as there are
# up to a null part, but for now, just save one...
- @reply_to = msg.shift
- @reply_service = msg.shift
- msg.shift # empty
- @reply_rid = msg.shift
- val = Oj.load(msg[0]) # We have a request to process
+ @reply_to = msg.pop.data
+ @reply_service = msg.pop.data
+ msg.pop # empty
+ @reply_rid = msg.pop.data
+ val = Oj.load(msg.pop.data) # We have a request to process
return val
when Pigato::W_HEARTBEAT
# do nothing
when Pigato::W_DISCONNECT
reconnect_to_broker
@@ -83,32 +76,31 @@
end
end
def reconnect_to_broker
if @socket
- @poller.deregister @socket, ZMQ::DEALER
@socket.close
end
@socket = @context.socket ZMQ::DEALER
- @socket.setsockopt ZMQ::IDENTITY, SecureRandom.uuid
- @socket.setsockopt ZMQ::LINGER, 0
+ @context.linger = 0
+ @socket.identity = SecureRandom.uuid
@socket.connect @broker
- @poller.register @socket, ZMQ::POLLIN
- send_to_broker(Pigato::W_READY, @service, [])
+ @socket.rcvtimeo = @timeout;
+ send_to_broker Pigato::W_READY, @service
@liveness = HEARTBEAT_LIVENESS
@heartbeat_at = Time.now + 0.001 * @heartbeat
end
- def send_to_broker command, message=nil, options=nil
- if message.nil?
- message = []
- elsif not message.is_a?(Array)
- message = [message]
+ def send_to_broker command, data = nil
+ if data.nil?
+ data = []
+ elsif not data.is_a?(Array)
+ data = [data]
end
- message = [Pigato::W_WORKER, command].concat message
- message = message.concat(options) if options
-
- @socket.send_strings message
+ data = [Pigato::W_WORKER, command].concat data
+ msg = ZMQ::Message.new
+ data.reverse.each{|p| msg.push(ZMQ::Frame(p))}
+ @socket.send_message msg
end
end