lib/pigato/worker.rb in pigato-0.2.27 vs lib/pigato/worker.rb in pigato-0.3.0
- old
+ new
@@ -1,37 +1,56 @@
-class Pigato::Worker
+require "#{File.dirname(__FILE__)}/base.rb"
+class Pigato::Worker < Pigato::Base
+
HEARTBEAT_LIVENESS = 3 # 3-5 is reasonable
- def initialize broker, service
+ def initialize broker, service, conf = {}
@broker = broker
@service = service
- @heartbeat_at = 0 # When to send HEARTBEAT (relative to time.time(), so in seconds)
- @liveness = 0 # How many attempts left
- @timeout = 2500
- @heartbeat = 2500 # Heartbeat delay, msecs
- @reconnect = 2500 # Reconnect delay, msecs
+ @conf = {
+ :autostart => false,
+ :timeout => 2500,
+ :heartbeat => 2500,
+ :reconnect => 2500
+ }
+
+ @conf.merge!(conf)
+
+ @liveness = 0
+ @heartbeat_at = 0
@reply_to = nil
@reply_rid = nil
@reply_service = nil
+
+ init
- reconnect_to_broker
+ if @conf[:autostart]
+ start
+ end
end
def reply reply
reply = [@reply_to, '', @reply_rid, '0'].concat([Oj.dump(reply)])
- send_to_broker Pigato::W_REPLY, reply
+ send Pigato::W_REPLY, reply
end
def recv
+
loop do
+
+ iid = get_iid
+
+ socket = get_socket
+ return nil if socket.nil?
+
@reply_rid = nil
@reply_to = nil
@reply_service = nil
- msg = @socket.recv_message
+ msg = socket.recv_message
if msg && msg.size
@liveness = HEARTBEAT_LIVENESS
header = msg.pop.data
@@ -53,56 +72,53 @@
val = Oj.load(msg.pop.data) # We have a request to process
return val
when Pigato::W_HEARTBEAT
# do nothing
when Pigato::W_DISCONNECT
- reconnect_to_broker
+ start
else
end
else
@liveness -= 1
if @liveness == 0
- sleep 0.001*@reconnect
- reconnect_to_broker
+ sleep 0.001 * @conf[:reconnect]
+ start
end
end
if Time.now > @heartbeat_at
- send_to_broker Pigato::W_HEARTBEAT
- @heartbeat_at = Time.now + 0.001 * @heartbeat
+ send Pigato::W_HEARTBEAT
+ @heartbeat_at = Time.now + 0.001 * @conf[:heartbeat]
end
end
end
- def reconnect_to_broker
- if @socket
- @socket.close
- end
- if @ctx
- @ctx.destroy
- end
-
- @ctx = ZMQ::Context.new
- @socket = @ctx.socket ZMQ::DEALER
- @ctx.linger = 0
- @socket.identity = SecureRandom.uuid
- @socket.connect @broker
- @socket.rcvtimeo = @timeout;
- send_to_broker Pigato::W_READY, @service
+ def start
+ stop
+ sock_create
+ send Pigato::W_READY, @service
+ super
@liveness = HEARTBEAT_LIVENESS
- @heartbeat_at = Time.now + 0.001 * @heartbeat
+ @heartbeat_at = Time.now + 0.001 * @conf[:heartbeat]
end
- def send_to_broker command, data = nil
+ def stop
+ sock_close
+ super
+ end
+
+ def send command, data = nil
if data.nil?
data = []
elsif not data.is_a?(Array)
data = [data]
end
+ socket = get_socket
+
data = [Pigato::W_WORKER, command].concat data
msg = ZMQ::Message.new
data.reverse.each{|p| msg.push(ZMQ::Frame(p))}
- @socket.send_message msg
+ socket.send_message msg
end
end