lib/hydra/worker.rb in hydra-0.4.1 vs lib/hydra/worker.rb in hydra-0.5.0

- old
+ new

@@ -1,112 +1,143 @@ module Hydra #:nodoc: # Hydra class responsible to dispatching runners and communicating with the master. + # + # The Worker is never run directly by a user. Workers are created by a + # Master to delegate to Runners. + # + # The general convention is to have one Worker per machine on a distributed + # network. class Worker + include Hydra::Messages::Worker # Create a new worker. # * io: The IO object to use to communicate with the master # * num_runners: The number of runners to launch - def initialize(io, num_runners) - @io = io + def initialize(opts = {}) + @verbose = opts.fetch(:verbose) { false } + @io = opts.fetch(:io) { raise "No IO Object" } @runners = [] @listeners = [] - boot_runners(num_runners) + + boot_runners(opts.fetch(:runners) { 1 }) process_messages + @runners.each{|r| Process.wait r[:pid] } end # message handling methods # When a runner wants a file, it hits this method with a message. # Then the worker bubbles the file request up to the master. def request_file(message, runner) - @io.write(Hydra::Messages::Worker::RequestFile.new) + @io.write(RequestFile.new) runner[:idle] = true end # When the master sends a file down to the worker, it hits this # method. Then the worker delegates the file down to a runner. def delegate_file(message) - r = idle_runner - r[:idle] = false - r[:io].write(Hydra::Messages::Runner::RunFile.new(eval(message.serialize))) + runner = idle_runner + runner[:idle] = false + runner[:io].write(RunFile.new(eval(message.serialize))) end # When a runner finishes, it sends the results up to the worker. Then the # worker sends the results up to the master. - # TODO: when we relay results, it should trigger a RunFile or Shutdown from - # the master implicitly def relay_results(message, runner) runner[:idle] = true - @io.write(Hydra::Messages::Worker::Results.new(eval(message.serialize))) + @io.write(Results.new(eval(message.serialize))) end # When a master issues a shutdown order, it hits this method, which causes # the worker to send shutdown messages to its runners. - # TODO: implement a ShutdownComplete message, so that we can kill the - # processes if necessary. def shutdown @running = false + $stdout.write "WORKER| Notifying #{@runners.size} Runners of Shutdown\n" if @verbose @runners.each do |r| - r[:io].write(Hydra::Messages::Runner::Shutdown.new) + $stdout.write "WORKER| Sending Shutdown to Runner\n" if @verbose + $stdout.write " | #{r.inspect}\n" if @verbose + r[:io].write(Shutdown.new) end + Thread.exit end private def boot_runners(num_runners) #:nodoc: + $stdout.write "WORKER| Booting #{num_runners} Runners\n" if @verbose num_runners.times do pipe = Hydra::Pipe.new child = Process.fork do pipe.identify_as_child - Hydra::Runner.new(pipe) + Hydra::Runner.new(:io => pipe, :verbose => @verbose) end pipe.identify_as_parent @runners << { :pid => child, :io => pipe, :idle => false } end + $stdout.write "WORKER| #{@runners.size} Runners booted\n" if @verbose end # Continuously process messages def process_messages #:nodoc: + $stdout.write "WORKER| Processing Messages\n" if @verbose @running = true - # Abort the worker if one of the runners has an exception - # TODO: catch this exception, return a dying message to the master - # then shutdown Thread.abort_on_exception = true - # Worker listens and handles messages + process_messages_from_master + process_messages_from_runners + + @listeners.each{|l| l.join } + @io.close + $stdout.write "WORKER| Done processing messages\n" if @verbose + end + + def process_messages_from_master @listeners << Thread.new do while @running - message = @io.gets - message.handle(self) if message + begin + message = @io.gets + if message + $stdout.write "WORKER| Received Message from Master\n" if @verbose + $stdout.write " | #{message.inspect}\n" if @verbose + message.handle(self) + else + @io.write Ping.new + end + rescue IOError => ex + $stderr.write "Worker lost Master\n" if @verbose + Thread.exit + end end end + end - # Runners listen, but when they handle they pass themselves - # so we can reference them when we deal with their messages + def process_messages_from_runners @runners.each do |r| @listeners << Thread.new do while @running begin message = r[:io].gets - message.handle(self, r) if message + if message + $stdout.write "WORKER| Received Message from Runner\n" if @verbose + $stdout.write " | #{message.inspect}\n" if @verbose + message.handle(self, r) + end rescue IOError => ex - # If the other end of the pipe closes - # we will continue, because we're probably - # not @running anymore + $stderr.write "Worker lost Runner [#{r.inspect}]\n" + Thread.exit end end end end - @listeners.each{|l| l.join } end # Get the next idle runner def idle_runner #:nodoc: idle_r = nil while idle_r.nil? - idle_r = @runners.detect{|r| r[:idle]} + idle_r = @runners.detect{|runner| runner[:idle]} sleep(1) end return idle_r end end