module Hydra #:nodoc: # Hydra class responsible to dispatching runners and communicating with the master. # # The Worker is never run directly by a user. Workers are created by a # Master to delegate to Runners. # # The general convention is to have one Worker per machine on a distributed # network. class Worker include Hydra::Messages::Worker traceable('WORKER') # Create a new worker. # * io: The IO object to use to communicate with the master # * num_runners: The number of runners to launch def initialize(opts = {}) @verbose = opts.fetch(:verbose) { false } @io = opts.fetch(:io) { raise "No IO Object" } @runners = [] @listeners = [] boot_runners(opts.fetch(:runners) { 1 }) process_messages @runners.each{|r| Process.wait r[:pid] } end # message handling methods # When a runner wants a file, it hits this method with a message. # Then the worker bubbles the file request up to the master. def request_file(message, runner) @io.write(RequestFile.new) runner[:idle] = true end # When the master sends a file down to the worker, it hits this # method. Then the worker delegates the file down to a runner. def delegate_file(message) runner = idle_runner runner[:idle] = false runner[:io].write(RunFile.new(eval(message.serialize))) end # When a runner finishes, it sends the results up to the worker. Then the # worker sends the results up to the master. def relay_results(message, runner) runner[:idle] = true @io.write(Results.new(eval(message.serialize))) end # When a master issues a shutdown order, it hits this method, which causes # the worker to send shutdown messages to its runners. def shutdown @running = false trace "Notifying #{@runners.size} Runners of Shutdown" @runners.each do |r| trace "Sending Shutdown to Runner" trace "\t#{r.inspect}" r[:io].write(Shutdown.new) end Thread.exit end private def boot_runners(num_runners) #:nodoc: trace "Booting #{num_runners} Runners" num_runners.times do pipe = Hydra::Pipe.new child = Process.fork do pipe.identify_as_child Hydra::Runner.new(:io => pipe, :verbose => @verbose) end pipe.identify_as_parent @runners << { :pid => child, :io => pipe, :idle => false } end trace "#{@runners.size} Runners booted" end # Continuously process messages def process_messages #:nodoc: trace "Processing Messages" @running = true Thread.abort_on_exception = true process_messages_from_master process_messages_from_runners @listeners.each{|l| l.join } @io.close trace "Done processing messages" end def process_messages_from_master @listeners << Thread.new do while @running begin message = @io.gets if message trace "Received Message from Master" trace "\t#{message.inspect}" message.handle(self) else trace "Nothing from Master, Pinging" @io.write Ping.new end rescue IOError => ex trace "Worker lost Master" Thread.exit end end end end def process_messages_from_runners @runners.each do |r| @listeners << Thread.new do while @running begin message = r[:io].gets if message trace "Received Message from Runner" trace "\t#{message.inspect}" message.handle(self, r) end rescue IOError => ex trace "Worker lost Runner [#{r.inspect}]" Thread.exit end end end end end # Get the next idle runner def idle_runner #:nodoc: idle_r = nil while idle_r.nil? idle_r = @runners.detect{|runner| runner[:idle]} sleep(1) end return idle_r end end end