lib/hydra/worker.rb in hydra-0.6.0 vs lib/hydra/worker.rb in hydra-0.7.0

- old
+ new

@@ -6,10 +6,11 @@ # # The general convention is to have one Worker per machine on a distributed # network. class Worker include Hydra::Messages::Worker + traceable('WORKER') # Create a new worker. # * io: The IO object to use to communicate with the master # * num_runners: The number of runners to launch def initialize(opts = {}) @verbose = opts.fetch(:verbose) { false } @@ -50,64 +51,65 @@ # When a master issues a shutdown order, it hits this method, which causes # the worker to send shutdown messages to its runners. def shutdown @running = false - $stdout.write "WORKER| Notifying #{@runners.size} Runners of Shutdown\n" if @verbose + trace "Notifying #{@runners.size} Runners of Shutdown" @runners.each do |r| - $stdout.write "WORKER| Sending Shutdown to Runner\n" if @verbose - $stdout.write " | #{r.inspect}\n" if @verbose + trace "Sending Shutdown to Runner" + trace "\t#{r.inspect}" r[:io].write(Shutdown.new) end Thread.exit end private def boot_runners(num_runners) #:nodoc: - $stdout.write "WORKER| Booting #{num_runners} Runners\n" if @verbose + trace "Booting #{num_runners} Runners" num_runners.times do pipe = Hydra::Pipe.new child = Process.fork do pipe.identify_as_child Hydra::Runner.new(:io => pipe, :verbose => @verbose) end pipe.identify_as_parent @runners << { :pid => child, :io => pipe, :idle => false } end - $stdout.write "WORKER| #{@runners.size} Runners booted\n" if @verbose + trace "#{@runners.size} Runners booted" end # Continuously process messages def process_messages #:nodoc: - $stdout.write "WORKER| Processing Messages\n" if @verbose + trace "Processing Messages" @running = true Thread.abort_on_exception = true process_messages_from_master process_messages_from_runners @listeners.each{|l| l.join } @io.close - $stdout.write "WORKER| Done processing messages\n" if @verbose + trace "Done processing messages" end def process_messages_from_master @listeners << Thread.new do while @running begin message = @io.gets if message - $stdout.write "WORKER| Received Message from Master\n" if @verbose - $stdout.write " | #{message.inspect}\n" if @verbose + trace "Received Message from Master" + trace "\t#{message.inspect}" message.handle(self) else + trace "Nothing from Master, Pinging" @io.write Ping.new end rescue IOError => ex - $stderr.write "Worker lost Master\n" if @verbose + trace "Worker lost Master" Thread.exit end end end end @@ -117,15 +119,15 @@ @listeners << Thread.new do while @running begin message = r[:io].gets if message - $stdout.write "WORKER| Received Message from Runner\n" if @verbose - $stdout.write " | #{message.inspect}\n" if @verbose + trace "Received Message from Runner" + trace "\t#{message.inspect}" message.handle(self, r) end rescue IOError => ex - $stderr.write "Worker lost Runner [#{r.inspect}]\n" if @verbose + trace "Worker lost Runner [#{r.inspect}]" Thread.exit end end end end