lib/remote_run/runner.rb in remote_run-0.1.0 vs lib/remote_run/runner.rb in remote_run-0.1.1
- old
+ new
@@ -1,233 +1,155 @@
-class Runner
- attr_accessor :remote_path, :local_path, :login_as, :exclude, :temp_path
- attr_reader :local_hostname, :identifier
- @@start_time = Time.now
- @@stty_config = `stty -g`
+module RemoteRun
+ class Runner
+ def initialize(configuration)
+ @configuration = configuration
+ @results = []
+ @children = []
+ @failed = []
+ @stty_config = `stty -g`
+ @last_timestamp = Time.now.strftime("%S")[0]
+ @hosts = []
- def initialize
- @task_manager = TaskManager.new
- @host_manager = HostManager.new
-
- # config options
- @local_path = Dir.getwd
- @login_as = `whoami`.strip
- @remote_path = "/tmp/remote"
- @exclude = []
- @temp_path = "/tmp/remote"
-
- # used in the runner
- @identifier = `echo $RANDOM`.strip
- @local_hostname = `hostname`.strip
- @results = []
- @children = []
- @failed = []
- @last_timestamp = Time.now.strftime("%S")[0]
-
- $runner = self
- yield self
- end
-
- def self.run(&block)
- @@start_time = Time.now
- runner = new(&block)
- runner.run
- end
-
- def self.run_time
- minutes = ((Time.now - @@start_time) / 60).to_i
- seconds = ((Time.now - @@start_time) % 60).to_i
- "#{minutes}:#{"%02d" % seconds}"
- end
-
- def self.log(message, color = :yellow)
- highline = HighLine.new
- system("stty #{@@stty_config} 2>/dev/null")
- highline.say(highline.color("[Remote :: #{$runner.identifier} :: #{run_time}] #{message}", color))
- end
-
- def hosts
- @host_manager.all
- end
-
- def hosts=(hostnames)
- hostnames.each do |hostname|
- @host_manager.add(hostname)
+ @task_manager = @configuration.task_manager
+ @host_manager = @configuration.host_manager
+ @starting_number_of_tasks = @task_manager.count
end
- end
- def tasks=(shell_commands)
- shell_commands.each do |shell_command|
- @task_manager.add(shell_command)
+ def run
+ setup_unlock_on_exit
+ start_ssh_master_connections
+ sync_working_copy_to_temp_location
+ start_tasks
+ wait_for_tasks_to_finish
+ handle_results
end
- end
- def run
- @host_manager.unlock_on_exit
- @host_manager.start_ssh_master_connections
- sync_working_copy_to_temp_location
- hosts = []
+ private
- Runner.log("Starting tasks... #{Time.now}")
-
- @starting_number_of_tasks = @task_manager.count
- while @task_manager.has_more_tasks?
- hosts = @host_manager.hosts.dup if hosts.empty?
-
- display_log
- check_for_finished
-
- if host = hosts.sample
- hosts.delete(host)
- if host.lock
- task = @task_manager.find_task
- @children << fork do
- begin
- this_host = host.dup
- unless this_host.copy_codebase
- @task_manager.add(task)
- status = 0
- end
- status = this_host.run(task)
- host.unlock
- Runner.log("#{host.hostname} failed.", :red) if status != 0
- rescue Errno::EPIPE
- Runner.log("broken pipe on #{host.hostname}...")
- ensure
- Process.exit!(status)
- end
+ def setup_unlock_on_exit
+ at_exit do
+ @configuration.hosts.each do |host|
+ begin
+ host.unlock
+ rescue Errno::EPIPE
end
end
end
end
- Runner.log("All tasks started... #{Time.now}")
-
- while @children.length > 0
- display_log
- check_for_finished
+ def start_ssh_master_connections
+ @configuration.hosts.each do |host|
+ host.start_ssh_master_connection
+ end
end
- failed_tasks = @results.select { |result| result != 0 }
- status_code = if failed_tasks.length == 0
- Runner.log("Task passed.", :green)
- Host::PASS
- else
- Runner.log("#{failed_tasks.length} task(s) failed.", :red)
- Host::FAIL
+ def sync_working_copy_to_temp_location
+ log("Creating temporary copy of #{@configuration.local_path} in #{@configuration.temp_path}...")
+ excludes = @configuration.exclude.map { |dir| "--exclude '#{dir}'"}
+ system("rsync --delete --delete-excluded #{excludes.join(" ")} -aq #{@configuration.local_path}/ #{@configuration.temp_path}/")
+ log("Done.")
end
- Runner.log("Total Time: #{self.class.run_time} minutes.")
- status_code
- end
+ def start_tasks
+ log("Starting tasks... #{Time.now}")
- def check_for_finished
- @children.each do |child_pid|
- if Process.waitpid(child_pid, Process::WNOHANG)
- if $?.exitstatus != 0
- @failed << child_pid
- end
-
- @results << $?.exitstatus
- @children.delete(child_pid)
+ while @task_manager.has_more_tasks?
+ display_log
+ check_for_finished
+ find_lock_and_start
end
- end
- sleep(0.5)
- end
- private
-
- def sync_working_copy_to_temp_location
- Runner.log("Creating temporary copy of #{@local_path} in #{@temp_path}...")
- excludes = exclude.map { |dir| "--exclude '#{dir}'"}
- system("rsync --delete --delete-excluded #{excludes.join(" ")} -aq #{@local_path}/ #{@temp_path}/")
- Runner.log("Done.")
- end
-
- def display_log
- now = Time.now.strftime("%S")[0]
- unless now == @last_timestamp
- display_status("Waiting on #{@task_manager.count} of #{@starting_number_of_tasks} tasks to start.") if @task_manager.count > 0
- display_status("Waiting on #{@children.length} of #{@starting_number_of_tasks - @task_manager.count} started tasks to finish. #{@failed.size} failed.") if @children.length > 0
- $stdout.print("\n\n")
- $stdout.flush
- @last_timestamp = now
+ log("All tasks started... #{Time.now}")
end
- end
- def display_status(message)
- Runner.log(message, :yellow)
- end
-
- class HostManager
- def initialize(&block)
- @hosts = []
+ def wait_for_tasks_to_finish
+ while @children.length > 0
+ display_log
+ check_for_finished
+ end
end
- def all
- @hosts
- end
-
- def add(hostname)
- host = Host.new(hostname)
- Thread.new do
- if host.is_up?
- @hosts << host
- end
+ def handle_results
+ failed_tasks = @results.select { |result| result != 0 }
+ status_code = if failed_tasks.length == 0
+ log("Task passed.", :green)
+ Host::PASS
+ else
+ log("#{failed_tasks.length} task(s) failed.", :red)
+ Host::FAIL
end
+
+ log("Total Time: #{run_time} minutes.")
+ status_code
end
- def hosts
- while @hosts.empty?
- Runner.log("Waiting for hosts...")
- sleep(0.5)
+ def start_task(host)
+ task = @task_manager.find_task
+ @children << fork do
+ start_forked_task(host, task)
end
-
- @hosts
end
- def unlock_on_exit
- at_exit do
- all.each do |host|
- begin
- host.unlock
- rescue Errno::EPIPE
- end
+ def start_forked_task(host, task)
+ begin
+ this_host = host.dup
+ unless this_host.copy_codebase
+ @task_manager.add(task)
+ status = 0
end
+ status = this_host.run(task.command)
+ host.unlock
+ rescue Errno::EPIPE
+ ensure
+ Process.exit!(status)
end
end
- def start_ssh_master_connections
- all.each do |host|
- fork do
- host.start_ssh_master_connection
+ def find_lock_and_start
+ @hosts = @host_manager.hosts.dup if @hosts.empty?
+ if host = @hosts.sample
+ @hosts.delete(host)
+ if host.lock
+ start_task(host)
end
end
end
- end
- class TaskManager
- def initialize
- @tasks = []
+ def run_time
+ minutes = ((Time.now - @configuration.start_time) / 60).to_i
+ seconds = ((Time.now - @configuration.start_time) % 60).to_i
+ "#{minutes}:#{"%02d" % seconds}"
end
- def add(script)
- @tasks.push(script)
+ def log(message, color = :yellow)
+ unless @configuration.quiet
+ highline = HighLine.new
+ system("stty #{@stty_config} 2>/dev/null")
+ highline.say(highline.color("[Remote :: #{@configuration.identifier} :: #{run_time}] #{message}", color))
+ end
end
- def find_task
- @tasks.shift
+ def check_for_finished
+ @children.each do |child_pid|
+ if task_is_finished?(child_pid)
+ @failed << child_pid unless $?.success?
+ @results << $?.exitstatus
+ @children.delete(child_pid)
+ end
+ end
end
- def all
- @tasks
+ def task_is_finished?(pid)
+ Process.waitpid(pid, Process::WNOHANG)
end
- def count
- @tasks.length
- end
-
- def has_more_tasks?
- @tasks.size > 0
+ def display_log
+ now = Time.now.strftime("%S")[0]
+ unless now == @last_timestamp
+ log("Waiting on #{@task_manager.count} of #{@starting_number_of_tasks} tasks to start.") if @task_manager.count > 0
+ log("Waiting on #{@children.length} of #{@starting_number_of_tasks - @task_manager.count} started tasks to finish. #{@failed.size} failed.") if @children.length > 0
+ $stdout.flush
+ @last_timestamp = now
+ end
end
end
end