lib/hydra/worker.rb in hydra-0.6.0 vs lib/hydra/worker.rb in hydra-0.7.0
- old
+ new
@@ -6,10 +6,11 @@
#
# The general convention is to have one Worker per machine on a distributed
# network.
class Worker
include Hydra::Messages::Worker
+ traceable('WORKER')
# Create a new worker.
# * io: The IO object to use to communicate with the master
# * num_runners: The number of runners to launch
def initialize(opts = {})
@verbose = opts.fetch(:verbose) { false }
@@ -50,64 +51,65 @@
# When a master issues a shutdown order, it hits this method, which causes
# the worker to send shutdown messages to its runners.
def shutdown
@running = false
- $stdout.write "WORKER| Notifying #{@runners.size} Runners of Shutdown\n" if @verbose
+ trace "Notifying #{@runners.size} Runners of Shutdown"
@runners.each do |r|
- $stdout.write "WORKER| Sending Shutdown to Runner\n" if @verbose
- $stdout.write " | #{r.inspect}\n" if @verbose
+ trace "Sending Shutdown to Runner"
+ trace "\t#{r.inspect}"
r[:io].write(Shutdown.new)
end
Thread.exit
end
private
def boot_runners(num_runners) #:nodoc:
- $stdout.write "WORKER| Booting #{num_runners} Runners\n" if @verbose
+ trace "Booting #{num_runners} Runners"
num_runners.times do
pipe = Hydra::Pipe.new
child = Process.fork do
pipe.identify_as_child
Hydra::Runner.new(:io => pipe, :verbose => @verbose)
end
pipe.identify_as_parent
@runners << { :pid => child, :io => pipe, :idle => false }
end
- $stdout.write "WORKER| #{@runners.size} Runners booted\n" if @verbose
+ trace "#{@runners.size} Runners booted"
end
# Continuously process messages
def process_messages #:nodoc:
- $stdout.write "WORKER| Processing Messages\n" if @verbose
+ trace "Processing Messages"
@running = true
Thread.abort_on_exception = true
process_messages_from_master
process_messages_from_runners
@listeners.each{|l| l.join }
@io.close
- $stdout.write "WORKER| Done processing messages\n" if @verbose
+ trace "Done processing messages"
end
def process_messages_from_master
@listeners << Thread.new do
while @running
begin
message = @io.gets
if message
- $stdout.write "WORKER| Received Message from Master\n" if @verbose
- $stdout.write " | #{message.inspect}\n" if @verbose
+ trace "Received Message from Master"
+ trace "\t#{message.inspect}"
message.handle(self)
else
+ trace "Nothing from Master, Pinging"
@io.write Ping.new
end
rescue IOError => ex
- $stderr.write "Worker lost Master\n" if @verbose
+ trace "Worker lost Master"
Thread.exit
end
end
end
end
@@ -117,15 +119,15 @@
@listeners << Thread.new do
while @running
begin
message = r[:io].gets
if message
- $stdout.write "WORKER| Received Message from Runner\n" if @verbose
- $stdout.write " | #{message.inspect}\n" if @verbose
+ trace "Received Message from Runner"
+ trace "\t#{message.inspect}"
message.handle(self, r)
end
rescue IOError => ex
- $stderr.write "Worker lost Runner [#{r.inspect}]\n" if @verbose
+ trace "Worker lost Runner [#{r.inspect}]"
Thread.exit
end
end
end
end