lib/remote_run/runner.rb in remote_run-0.1.0 vs lib/remote_run/runner.rb in remote_run-0.1.1

- old
+ new

@@ -1,233 +1,155 @@ -class Runner - attr_accessor :remote_path, :local_path, :login_as, :exclude, :temp_path - attr_reader :local_hostname, :identifier - @@start_time = Time.now - @@stty_config = `stty -g` +module RemoteRun + class Runner + def initialize(configuration) + @configuration = configuration + @results = [] + @children = [] + @failed = [] + @stty_config = `stty -g` + @last_timestamp = Time.now.strftime("%S")[0] + @hosts = [] - def initialize - @task_manager = TaskManager.new - @host_manager = HostManager.new - - # config options - @local_path = Dir.getwd - @login_as = `whoami`.strip - @remote_path = "/tmp/remote" - @exclude = [] - @temp_path = "/tmp/remote" - - # used in the runner - @identifier = `echo $RANDOM`.strip - @local_hostname = `hostname`.strip - @results = [] - @children = [] - @failed = [] - @last_timestamp = Time.now.strftime("%S")[0] - - $runner = self - yield self - end - - def self.run(&block) - @@start_time = Time.now - runner = new(&block) - runner.run - end - - def self.run_time - minutes = ((Time.now - @@start_time) / 60).to_i - seconds = ((Time.now - @@start_time) % 60).to_i - "#{minutes}:#{"%02d" % seconds}" - end - - def self.log(message, color = :yellow) - highline = HighLine.new - system("stty #{@@stty_config} 2>/dev/null") - highline.say(highline.color("[Remote :: #{$runner.identifier} :: #{run_time}] #{message}", color)) - end - - def hosts - @host_manager.all - end - - def hosts=(hostnames) - hostnames.each do |hostname| - @host_manager.add(hostname) + @task_manager = @configuration.task_manager + @host_manager = @configuration.host_manager + @starting_number_of_tasks = @task_manager.count end - end - def tasks=(shell_commands) - shell_commands.each do |shell_command| - @task_manager.add(shell_command) + def run + setup_unlock_on_exit + start_ssh_master_connections + sync_working_copy_to_temp_location + start_tasks + wait_for_tasks_to_finish + handle_results end - end - def run - @host_manager.unlock_on_exit - @host_manager.start_ssh_master_connections - sync_working_copy_to_temp_location - hosts = [] + private - Runner.log("Starting tasks... #{Time.now}") - - @starting_number_of_tasks = @task_manager.count - while @task_manager.has_more_tasks? - hosts = @host_manager.hosts.dup if hosts.empty? - - display_log - check_for_finished - - if host = hosts.sample - hosts.delete(host) - if host.lock - task = @task_manager.find_task - @children << fork do - begin - this_host = host.dup - unless this_host.copy_codebase - @task_manager.add(task) - status = 0 - end - status = this_host.run(task) - host.unlock - Runner.log("#{host.hostname} failed.", :red) if status != 0 - rescue Errno::EPIPE - Runner.log("broken pipe on #{host.hostname}...") - ensure - Process.exit!(status) - end + def setup_unlock_on_exit + at_exit do + @configuration.hosts.each do |host| + begin + host.unlock + rescue Errno::EPIPE end end end end - Runner.log("All tasks started... #{Time.now}") - - while @children.length > 0 - display_log - check_for_finished + def start_ssh_master_connections + @configuration.hosts.each do |host| + host.start_ssh_master_connection + end end - failed_tasks = @results.select { |result| result != 0 } - status_code = if failed_tasks.length == 0 - Runner.log("Task passed.", :green) - Host::PASS - else - Runner.log("#{failed_tasks.length} task(s) failed.", :red) - Host::FAIL + def sync_working_copy_to_temp_location + log("Creating temporary copy of #{@configuration.local_path} in #{@configuration.temp_path}...") + excludes = @configuration.exclude.map { |dir| "--exclude '#{dir}'"} + system("rsync --delete --delete-excluded #{excludes.join(" ")} -aq #{@configuration.local_path}/ #{@configuration.temp_path}/") + log("Done.") end - Runner.log("Total Time: #{self.class.run_time} minutes.") - status_code - end + def start_tasks + log("Starting tasks... #{Time.now}") - def check_for_finished - @children.each do |child_pid| - if Process.waitpid(child_pid, Process::WNOHANG) - if $?.exitstatus != 0 - @failed << child_pid - end - - @results << $?.exitstatus - @children.delete(child_pid) + while @task_manager.has_more_tasks? + display_log + check_for_finished + find_lock_and_start end - end - sleep(0.5) - end - private - - def sync_working_copy_to_temp_location - Runner.log("Creating temporary copy of #{@local_path} in #{@temp_path}...") - excludes = exclude.map { |dir| "--exclude '#{dir}'"} - system("rsync --delete --delete-excluded #{excludes.join(" ")} -aq #{@local_path}/ #{@temp_path}/") - Runner.log("Done.") - end - - def display_log - now = Time.now.strftime("%S")[0] - unless now == @last_timestamp - display_status("Waiting on #{@task_manager.count} of #{@starting_number_of_tasks} tasks to start.") if @task_manager.count > 0 - display_status("Waiting on #{@children.length} of #{@starting_number_of_tasks - @task_manager.count} started tasks to finish. #{@failed.size} failed.") if @children.length > 0 - $stdout.print("\n\n") - $stdout.flush - @last_timestamp = now + log("All tasks started... #{Time.now}") end - end - def display_status(message) - Runner.log(message, :yellow) - end - - class HostManager - def initialize(&block) - @hosts = [] + def wait_for_tasks_to_finish + while @children.length > 0 + display_log + check_for_finished + end end - def all - @hosts - end - - def add(hostname) - host = Host.new(hostname) - Thread.new do - if host.is_up? - @hosts << host - end + def handle_results + failed_tasks = @results.select { |result| result != 0 } + status_code = if failed_tasks.length == 0 + log("Task passed.", :green) + Host::PASS + else + log("#{failed_tasks.length} task(s) failed.", :red) + Host::FAIL end + + log("Total Time: #{run_time} minutes.") + status_code end - def hosts - while @hosts.empty? - Runner.log("Waiting for hosts...") - sleep(0.5) + def start_task(host) + task = @task_manager.find_task + @children << fork do + start_forked_task(host, task) end - - @hosts end - def unlock_on_exit - at_exit do - all.each do |host| - begin - host.unlock - rescue Errno::EPIPE - end + def start_forked_task(host, task) + begin + this_host = host.dup + unless this_host.copy_codebase + @task_manager.add(task) + status = 0 end + status = this_host.run(task.command) + host.unlock + rescue Errno::EPIPE + ensure + Process.exit!(status) end end - def start_ssh_master_connections - all.each do |host| - fork do - host.start_ssh_master_connection + def find_lock_and_start + @hosts = @host_manager.hosts.dup if @hosts.empty? + if host = @hosts.sample + @hosts.delete(host) + if host.lock + start_task(host) end end end - end - class TaskManager - def initialize - @tasks = [] + def run_time + minutes = ((Time.now - @configuration.start_time) / 60).to_i + seconds = ((Time.now - @configuration.start_time) % 60).to_i + "#{minutes}:#{"%02d" % seconds}" end - def add(script) - @tasks.push(script) + def log(message, color = :yellow) + unless @configuration.quiet + highline = HighLine.new + system("stty #{@stty_config} 2>/dev/null") + highline.say(highline.color("[Remote :: #{@configuration.identifier} :: #{run_time}] #{message}", color)) + end end - def find_task - @tasks.shift + def check_for_finished + @children.each do |child_pid| + if task_is_finished?(child_pid) + @failed << child_pid unless $?.success? + @results << $?.exitstatus + @children.delete(child_pid) + end + end end - def all - @tasks + def task_is_finished?(pid) + Process.waitpid(pid, Process::WNOHANG) end - def count - @tasks.length - end - - def has_more_tasks? - @tasks.size > 0 + def display_log + now = Time.now.strftime("%S")[0] + unless now == @last_timestamp + log("Waiting on #{@task_manager.count} of #{@starting_number_of_tasks} tasks to start.") if @task_manager.count > 0 + log("Waiting on #{@children.length} of #{@starting_number_of_tasks - @task_manager.count} started tasks to finish. #{@failed.size} failed.") if @children.length > 0 + $stdout.flush + @last_timestamp = now + end end end end