lib/org-converge/engine.rb in org-converge-0.0.14 vs lib/org-converge/engine.rb in org-converge-0.0.15

- old
+ new

@@ -4,10 +4,12 @@ # require 'foreman/engine' require 'foreman/process' require 'tco' require 'fileutils' +require 'net/ssh' +require 'net/scp' module OrgConverge class Engine < Foreman::Engine attr_reader :logger @@ -83,10 +85,11 @@ end def register(name, command, options={}) options[:env] ||= env options[:cwd] ||= File.dirname(command.split(" ").first) + options[:babel] ||= @babel process = OrgConverge::CodeBlockProcess.new(command, options) @names[process] = name @processes << process end @@ -129,10 +132,11 @@ output_with_mutex name_for(pid), termination_message_for(status) unless @runmode == 'spec' @running.delete(pid) yield if block_given? pid rescue Errno::ECHILD + yield if block_given? end def termination_message_for(status) n = name_for(status.pid).split('.').first @@ -165,24 +169,32 @@ # Need to expose the options to make the process be aware # of the possible running mode (specially spec mode) # and where to put the results output class CodeBlockProcess < Foreman::Process + include OrgConverge::Helpers attr_reader :options def run(options={}) env = @options[:env].merge(options[:env] || {}) + logger = @options[:logger] output = options[:output] || $stdout runner = "#{Foreman.runner}".shellescape + @babel = @options[:babel] # whitelist the modifiers which manipulate how to the block is started block_modifiers = { } if options[:header] block_modifiers[:waitfor] = options[:header][:waitsfor] || options[:header][:waitfor] || options[:header][:sleep] block_modifiers[:timeout] = options[:header][:timeoutin] || options[:header][:timeout] || options[:header][:timeoutafter] if options[:header][:dir] - block_modifiers[:cwd] = File.expand_path(File.join(self.options[:cwd], options[:header][:dir])) + ssh_params = determine_ssh_params(options[:header][:dir]) + if ssh_params[:host] + block_modifiers[:ssh] = ssh_params + else + block_modifiers[:cwd] = File.expand_path(File.join(self.options[:cwd], options[:header][:dir])) + end end end pid = nil thread = nil @@ -202,18 +214,70 @@ end opts = { :out => output, :err => output } pid = Process.spawn env, wrapped_command, opts end + ssh_process = nil + if block_modifiers[:ssh] + ssh_process = proc do + ssh_options = { } + ssh_options[:port] = block_modifiers[:ssh][:port] + ssh_options[:password] = block_modifiers[:ssh][:password] if block_modifiers[:ssh][:password] + ssh_options[:keys] = @babel.ob.in_buffer_settings['SSHIDENTIFYFILE'] if @babel.ob.in_buffer_settings['SSHIDENTIFYFILE'] + begin + # SCP the script to run remotely and the binary used to run it + binary, script = command.split(' ') + remote_file = if not block_modifiers[:ssh][:remote_dir].empty? + File.join(block_modifiers[:ssh][:remote_dir], "org-run-#{File.basename(script)}") + else + "org-run-#{File.basename(script)}" + end + scp_options = ssh_options + scp_options[:keys] = [ssh_options[:keys]] if ssh_options[:keys] + + # TODO: Detect and upload the file only once + Net::SCP.upload!(block_modifiers[:ssh][:host], + block_modifiers[:ssh][:user], + script, + remote_file, + :ssh => scp_options) + Net::SSH.start(block_modifiers[:ssh][:host], + block_modifiers[:ssh][:user], ssh_options) do |ssh| + channel = ssh.open_channel do |chan| + chan.exec "#{binary} #{remote_file}" do |ch, success| + raise "could not execute command" unless success + + # "on_data" is called when the process writes something to stdout + # "on_extended_data" is called when the process writes something to stderr + chan.on_data { |c, data| output.puts data } + chan.on_extended_data { |c, type, data| output.puts data } + chan.on_close { output.puts "exited from #{block_modifiers[:ssh][:host]}"} + end + chan.wait + end + ssh.loop + end + rescue Net::SCP::Error + output.puts "Error when transporting file: #{script}" + rescue => e + puts "Error during ssh session: #{e}" + end + end + end + # In case we modify the run block, we run it in a Thread # otherwise we continue treating it as a forked process. - if block_modifiers and (block_modifiers[:waitfor] || block_modifiers[:timeout] || block_modifiers[:dir]) + if block_modifiers and (block_modifiers[:waitfor] || block_modifiers[:timeout] || block_modifiers[:dir] || block_modifiers[:ssh]) waitfor = block_modifiers[:waitfor].to_i timeout = block_modifiers[:timeout].to_i thread = Thread.new do sleep waitfor if waitfor > 0 - pid = process.call + if ssh_process + ssh_process.call + else + pid = process.call + end if timeout > 0 sleep timeout # FIXME: Kill children properly o = `ps -ef | awk '$3 == #{pid} { print $2 }'` o.each_line { |cpid| Process.kill(:TERM, cpid.to_i) }