lib/pigato/worker.rb in pigato-0.2.27 vs lib/pigato/worker.rb in pigato-0.3.0

- old
+ new

@@ -1,37 +1,56 @@ -class Pigato::Worker +require "#{File.dirname(__FILE__)}/base.rb" +class Pigato::Worker < Pigato::Base + HEARTBEAT_LIVENESS = 3 # 3-5 is reasonable - def initialize broker, service + def initialize broker, service, conf = {} @broker = broker @service = service - @heartbeat_at = 0 # When to send HEARTBEAT (relative to time.time(), so in seconds) - @liveness = 0 # How many attempts left - @timeout = 2500 - @heartbeat = 2500 # Heartbeat delay, msecs - @reconnect = 2500 # Reconnect delay, msecs + @conf = { + :autostart => false, + :timeout => 2500, + :heartbeat => 2500, + :reconnect => 2500 + } + + @conf.merge!(conf) + + @liveness = 0 + @heartbeat_at = 0 @reply_to = nil @reply_rid = nil @reply_service = nil + + init - reconnect_to_broker + if @conf[:autostart] + start + end end def reply reply reply = [@reply_to, '', @reply_rid, '0'].concat([Oj.dump(reply)]) - send_to_broker Pigato::W_REPLY, reply + send Pigato::W_REPLY, reply end def recv + loop do + + iid = get_iid + + socket = get_socket + return nil if socket.nil? + @reply_rid = nil @reply_to = nil @reply_service = nil - msg = @socket.recv_message + msg = socket.recv_message if msg && msg.size @liveness = HEARTBEAT_LIVENESS header = msg.pop.data @@ -53,56 +72,53 @@ val = Oj.load(msg.pop.data) # We have a request to process return val when Pigato::W_HEARTBEAT # do nothing when Pigato::W_DISCONNECT - reconnect_to_broker + start else end else @liveness -= 1 if @liveness == 0 - sleep 0.001*@reconnect - reconnect_to_broker + sleep 0.001 * @conf[:reconnect] + start end end if Time.now > @heartbeat_at - send_to_broker Pigato::W_HEARTBEAT - @heartbeat_at = Time.now + 0.001 * @heartbeat + send Pigato::W_HEARTBEAT + @heartbeat_at = Time.now + 0.001 * @conf[:heartbeat] end end end - def reconnect_to_broker - if @socket - @socket.close - end - if @ctx - @ctx.destroy - end - - @ctx = ZMQ::Context.new - @socket = @ctx.socket ZMQ::DEALER - @ctx.linger = 0 - @socket.identity = SecureRandom.uuid - @socket.connect @broker - @socket.rcvtimeo = @timeout; - send_to_broker Pigato::W_READY, @service + def start + stop + sock_create + send Pigato::W_READY, @service + super @liveness = HEARTBEAT_LIVENESS - @heartbeat_at = Time.now + 0.001 * @heartbeat + @heartbeat_at = Time.now + 0.001 * @conf[:heartbeat] end - def send_to_broker command, data = nil + def stop + sock_close + super + end + + def send command, data = nil if data.nil? data = [] elsif not data.is_a?(Array) data = [data] end + socket = get_socket + data = [Pigato::W_WORKER, command].concat data msg = ZMQ::Message.new data.reverse.each{|p| msg.push(ZMQ::Frame(p))} - @socket.send_message msg + socket.send_message msg end end