lib/pigato/worker.rb in pigato-0.1.8 vs lib/pigato/worker.rb in pigato-0.1.9
- old
+ new
@@ -1,19 +1,19 @@
require "oj"
require "ffi-rzmq"
require "securerandom"
-class PigatoWorker
+class Pigato::Worker
HEARTBEAT_LIVENESS = 3 # 3-5 is reasonable
def initialize broker, service
@broker = broker
@service = service
@context = ZMQ::Context.new(1)
@poller = ZMQ::Poller.new
- @worker = nil # Socket to broker
+ @socket = nil # Socket to broker
@heartbeat_at = 0 # When to send HEARTBEAT (relative to time.time(), so in seconds)
@liveness = 0 # How many attempts left
@timeout = 2500
@heartbeat = 2500 # Heartbeat delay, msecs
@reconnect = 2500 # Reconnect delay, msecs
@@ -37,11 +37,11 @@
@reply_service = nil
items = @poller.poll(@timeout)
if items
msg = []
- @worker.recv_strings msg
+ @socket.recv_strings msg
@liveness = HEARTBEAT_LIVENESS
header = msg.shift
if header != Pigato::W_WORKER
@@ -82,20 +82,20 @@
end
end
def reconnect_to_broker
- if @worker
- @poller.deregister @worker, ZMQ::DEALER
- @worker.close
+ if @socket
+ @poller.deregister @socket, ZMQ::DEALER
+ @socket.close
end
- @worker = @context.socket ZMQ::DEALER
- @worker.setsockopt ZMQ::IDENTITY, SecureRandom.uuid
- @worker.setsockopt ZMQ::LINGER, 0
- @worker.connect @broker
- @poller.register @worker, ZMQ::POLLIN
+ @socket = @context.socket ZMQ::DEALER
+ @socket.setsockopt ZMQ::IDENTITY, SecureRandom.uuid
+ @socket.setsockopt ZMQ::LINGER, 0
+ @socket.connect @broker
+ @poller.register @socket, ZMQ::POLLIN
send_to_broker(Pigato::W_READY, @service, [])
@liveness = HEARTBEAT_LIVENESS
@heartbeat_at = Time.now + 0.001 * @heartbeat
end
@@ -107,8 +107,8 @@
end
message = [Pigato::W_WORKER, command].concat message
message = message.concat(options) if options
- @worker.send_strings message
+ @socket.send_strings message
end
end