lib/pigato/worker.rb in pigato-0.2.1 vs lib/pigato/worker.rb in pigato-0.2.2

- old
+ new

@@ -1,18 +1,13 @@ -require "oj" -require "ffi-rzmq" -require "securerandom" - class Pigato::Worker HEARTBEAT_LIVENESS = 3 # 3-5 is reasonable def initialize broker, service @broker = broker @service = service @context = ZMQ::Context.new(1) - @poller = ZMQ::Poller.new @socket = nil # Socket to broker @heartbeat_at = 0 # When to send HEARTBEAT (relative to time.time(), so in seconds) @liveness = 0 # How many attempts left @timeout = 2500 @heartbeat = 2500 # Heartbeat delay, msecs @@ -25,43 +20,41 @@ reconnect_to_broker end def reply reply reply = [@reply_to, '', @reply_rid, '0'].concat([Oj.dump(reply)]) - send_to_broker Pigato::W_REPLY, reply, nil + send_to_broker Pigato::W_REPLY, reply end def recv reply loop do @reply_rid = nil @reply_to = nil @reply_service = nil - items = @poller.poll(@timeout) - if items - msg = [] - @socket.recv_strings msg + msg = @socket.recv_message + if msg && msg.size @liveness = HEARTBEAT_LIVENESS - header = msg.shift + header = msg.pop.data if header != Pigato::W_WORKER puts "E: Header is not Pigato::WORKER" next end - command = msg.shift + command = msg.pop.data case command when Pigato::W_REQUEST # We should pop and save as many addresses as there are # up to a null part, but for now, just save one... - @reply_to = msg.shift - @reply_service = msg.shift - msg.shift # empty - @reply_rid = msg.shift - val = Oj.load(msg[0]) # We have a request to process + @reply_to = msg.pop.data + @reply_service = msg.pop.data + msg.pop # empty + @reply_rid = msg.pop.data + val = Oj.load(msg.pop.data) # We have a request to process return val when Pigato::W_HEARTBEAT # do nothing when Pigato::W_DISCONNECT reconnect_to_broker @@ -83,32 +76,31 @@ end end def reconnect_to_broker if @socket - @poller.deregister @socket, ZMQ::DEALER @socket.close end @socket = @context.socket ZMQ::DEALER - @socket.setsockopt ZMQ::IDENTITY, SecureRandom.uuid - @socket.setsockopt ZMQ::LINGER, 0 + @context.linger = 0 + @socket.identity = SecureRandom.uuid @socket.connect @broker - @poller.register @socket, ZMQ::POLLIN - send_to_broker(Pigato::W_READY, @service, []) + @socket.rcvtimeo = @timeout; + send_to_broker Pigato::W_READY, @service @liveness = HEARTBEAT_LIVENESS @heartbeat_at = Time.now + 0.001 * @heartbeat end - def send_to_broker command, message=nil, options=nil - if message.nil? - message = [] - elsif not message.is_a?(Array) - message = [message] + def send_to_broker command, data = nil + if data.nil? + data = [] + elsif not data.is_a?(Array) + data = [data] end - message = [Pigato::W_WORKER, command].concat message - message = message.concat(options) if options - - @socket.send_strings message + data = [Pigato::W_WORKER, command].concat data + msg = ZMQ::Message.new + data.reverse.each{|p| msg.push(ZMQ::Frame(p))} + @socket.send_message msg end end