lib/org-converge/engine.rb in org-converge-0.0.14 vs lib/org-converge/engine.rb in org-converge-0.0.15
- old
+ new
@@ -4,10 +4,12 @@
#
require 'foreman/engine'
require 'foreman/process'
require 'tco'
require 'fileutils'
+require 'net/ssh'
+require 'net/scp'
module OrgConverge
class Engine < Foreman::Engine
attr_reader :logger
@@ -83,10 +85,11 @@
end
def register(name, command, options={})
options[:env] ||= env
options[:cwd] ||= File.dirname(command.split(" ").first)
+ options[:babel] ||= @babel
process = OrgConverge::CodeBlockProcess.new(command, options)
@names[process] = name
@processes << process
end
@@ -129,10 +132,11 @@
output_with_mutex name_for(pid), termination_message_for(status) unless @runmode == 'spec'
@running.delete(pid)
yield if block_given?
pid
rescue Errno::ECHILD
+ yield if block_given?
end
def termination_message_for(status)
n = name_for(status.pid).split('.').first
@@ -165,24 +169,32 @@
# Need to expose the options to make the process be aware
# of the possible running mode (specially spec mode)
# and where to put the results output
class CodeBlockProcess < Foreman::Process
+ include OrgConverge::Helpers
attr_reader :options
def run(options={})
env = @options[:env].merge(options[:env] || {})
+ logger = @options[:logger]
output = options[:output] || $stdout
runner = "#{Foreman.runner}".shellescape
+ @babel = @options[:babel]
# whitelist the modifiers which manipulate how to the block is started
block_modifiers = { }
if options[:header]
block_modifiers[:waitfor] = options[:header][:waitsfor] || options[:header][:waitfor] || options[:header][:sleep]
block_modifiers[:timeout] = options[:header][:timeoutin] || options[:header][:timeout] || options[:header][:timeoutafter]
if options[:header][:dir]
- block_modifiers[:cwd] = File.expand_path(File.join(self.options[:cwd], options[:header][:dir]))
+ ssh_params = determine_ssh_params(options[:header][:dir])
+ if ssh_params[:host]
+ block_modifiers[:ssh] = ssh_params
+ else
+ block_modifiers[:cwd] = File.expand_path(File.join(self.options[:cwd], options[:header][:dir]))
+ end
end
end
pid = nil
thread = nil
@@ -202,18 +214,70 @@
end
opts = { :out => output, :err => output }
pid = Process.spawn env, wrapped_command, opts
end
+ ssh_process = nil
+ if block_modifiers[:ssh]
+ ssh_process = proc do
+ ssh_options = { }
+ ssh_options[:port] = block_modifiers[:ssh][:port]
+ ssh_options[:password] = block_modifiers[:ssh][:password] if block_modifiers[:ssh][:password]
+ ssh_options[:keys] = @babel.ob.in_buffer_settings['SSHIDENTIFYFILE'] if @babel.ob.in_buffer_settings['SSHIDENTIFYFILE']
+ begin
+ # SCP the script to run remotely and the binary used to run it
+ binary, script = command.split(' ')
+ remote_file = if not block_modifiers[:ssh][:remote_dir].empty?
+ File.join(block_modifiers[:ssh][:remote_dir], "org-run-#{File.basename(script)}")
+ else
+ "org-run-#{File.basename(script)}"
+ end
+ scp_options = ssh_options
+ scp_options[:keys] = [ssh_options[:keys]] if ssh_options[:keys]
+
+ # TODO: Detect and upload the file only once
+ Net::SCP.upload!(block_modifiers[:ssh][:host],
+ block_modifiers[:ssh][:user],
+ script,
+ remote_file,
+ :ssh => scp_options)
+ Net::SSH.start(block_modifiers[:ssh][:host],
+ block_modifiers[:ssh][:user], ssh_options) do |ssh|
+ channel = ssh.open_channel do |chan|
+ chan.exec "#{binary} #{remote_file}" do |ch, success|
+ raise "could not execute command" unless success
+
+ # "on_data" is called when the process writes something to stdout
+ # "on_extended_data" is called when the process writes something to stderr
+ chan.on_data { |c, data| output.puts data }
+ chan.on_extended_data { |c, type, data| output.puts data }
+ chan.on_close { output.puts "exited from #{block_modifiers[:ssh][:host]}"}
+ end
+ chan.wait
+ end
+ ssh.loop
+ end
+ rescue Net::SCP::Error
+ output.puts "Error when transporting file: #{script}"
+ rescue => e
+ puts "Error during ssh session: #{e}"
+ end
+ end
+ end
+
# In case we modify the run block, we run it in a Thread
# otherwise we continue treating it as a forked process.
- if block_modifiers and (block_modifiers[:waitfor] || block_modifiers[:timeout] || block_modifiers[:dir])
+ if block_modifiers and (block_modifiers[:waitfor] || block_modifiers[:timeout] || block_modifiers[:dir] || block_modifiers[:ssh])
waitfor = block_modifiers[:waitfor].to_i
timeout = block_modifiers[:timeout].to_i
thread = Thread.new do
sleep waitfor if waitfor > 0
- pid = process.call
+ if ssh_process
+ ssh_process.call
+ else
+ pid = process.call
+ end
if timeout > 0
sleep timeout
# FIXME: Kill children properly
o = `ps -ef | awk '$3 == #{pid} { print $2 }'`
o.each_line { |cpid| Process.kill(:TERM, cpid.to_i) }