lib/pigato/worker.rb in pigato-0.1.8 vs lib/pigato/worker.rb in pigato-0.1.9

- old
+ new

@@ -1,19 +1,19 @@ require "oj" require "ffi-rzmq" require "securerandom" -class PigatoWorker +class Pigato::Worker HEARTBEAT_LIVENESS = 3 # 3-5 is reasonable def initialize broker, service @broker = broker @service = service @context = ZMQ::Context.new(1) @poller = ZMQ::Poller.new - @worker = nil # Socket to broker + @socket = nil # Socket to broker @heartbeat_at = 0 # When to send HEARTBEAT (relative to time.time(), so in seconds) @liveness = 0 # How many attempts left @timeout = 2500 @heartbeat = 2500 # Heartbeat delay, msecs @reconnect = 2500 # Reconnect delay, msecs @@ -37,11 +37,11 @@ @reply_service = nil items = @poller.poll(@timeout) if items msg = [] - @worker.recv_strings msg + @socket.recv_strings msg @liveness = HEARTBEAT_LIVENESS header = msg.shift if header != Pigato::W_WORKER @@ -82,20 +82,20 @@ end end def reconnect_to_broker - if @worker - @poller.deregister @worker, ZMQ::DEALER - @worker.close + if @socket + @poller.deregister @socket, ZMQ::DEALER + @socket.close end - @worker = @context.socket ZMQ::DEALER - @worker.setsockopt ZMQ::IDENTITY, SecureRandom.uuid - @worker.setsockopt ZMQ::LINGER, 0 - @worker.connect @broker - @poller.register @worker, ZMQ::POLLIN + @socket = @context.socket ZMQ::DEALER + @socket.setsockopt ZMQ::IDENTITY, SecureRandom.uuid + @socket.setsockopt ZMQ::LINGER, 0 + @socket.connect @broker + @poller.register @socket, ZMQ::POLLIN send_to_broker(Pigato::W_READY, @service, []) @liveness = HEARTBEAT_LIVENESS @heartbeat_at = Time.now + 0.001 * @heartbeat end @@ -107,8 +107,8 @@ end message = [Pigato::W_WORKER, command].concat message message = message.concat(options) if options - @worker.send_strings message + @socket.send_strings message end end