lib/org-converge/engine.rb in org-converge-0.0.11 vs lib/org-converge/engine.rb in org-converge-0.0.12
- old
+ new
@@ -17,42 +17,63 @@
def initialize(options={})
super(options)
@logger = options[:logger] || Logger.new(STDOUT)
@babel = options[:babel]
@runmode = options[:runmode]
+
+ # Code blocks whose start invocation is manipulated run inside a thread
+ @threads = []
+ @running_threads = { }
end
# We allow other processes to exit with 0 status
# to continue with the runlist
def start
register_signal_handlers
spawn_processes
watch_for_output
sleep 0.1
begin
- status = watch_for_termination
- end while @running.count > 0
+ status = watch_for_termination do
+ @threads.each do |t|
+ unless t.alive?
+ t.exit
+ @running_threads.delete(t.__id__)
+ end
+ end
+ end
+ end while (@running.count > 0 or @running_threads.count > 0)
end
# Overriden: we do not consider process formations
def spawn_processes
@processes.each do |process|
reader, writer = create_pipe
+ pid = nil
+ thread = nil
begin
# In case of spec mode, we need to redirect the output to a results file instead
writer = File.open(process.options[:results], 'a') if @runmode == 'spec'
- pid = process.run(:output => writer)
- @names[process] = "#{@names[process]}.#{pid}"
+ pid, thread = process.run(:output => writer, :header => process.options[:header])
+ @names[process] = "#{@names[process]}.#{pid || thread.__id__}"
# NOTE: In spec mode we need to be more strict on what is flushed by the engine
# because we will be comparing the output
- writer.puts "started with pid #{pid}" unless @runmode == 'spec'
+ unless @runmode == 'spec'
+ writer.puts "started with pid #{pid}" if pid
+ writer.puts "started thread with tid #{thread.__id__}" if thread
+ end
rescue Errno::ENOENT
writer.puts "unknown command: #{process.command}" unless @runmode == 'spec'
end
- @running[pid] = [process]
- @readers[pid] = reader
+
+ @running[pid] = [process] if pid
+ @readers[pid || thread.__id__] = reader
+ if thread
+ @threads << thread
+ @running_threads[thread.__id__] = [process]
+ end
end
end
def register(name, command, options={})
options[:env] ||= env
@@ -63,27 +84,28 @@
end
def output(name, data)
data.to_s.lines.map(&:chomp).each do |message|
# FIXME: In case the process has finished before its lines where flushed
+ output = "#{name} -- #{message}"
ps, pid = name.empty? ? '<defunct>' : name.split('.')
- output = "#{pad_process_name(ps)}(#{pid})".fg get_color_for_pid(pid.to_i)
+ output = "#{pad_process_name(ps)}".fg get_color_for_pid(pid.to_i)
output += " -- "
output += message
- # FIXME: When the process has stopped already, the name of the process does not appear
+ # FIXME: When the process has stopped already, the name of the process/thread does not appear
# (which means that this approach is wrong from the beginning probably)
logger.info output
end
rescue Errno::EPIPE
terminate_gracefully
end
private
def name_padding
@name_padding ||= begin
- name_padding = @names.values.map { |n| n.length }.sort.last
+ name_padding = @names.values.map { |n| n.split('.').first.length }.sort.last
[ 9, name_padding ].max
end
end
def pad_process_name(name)
@@ -100,14 +122,72 @@
@running.delete(pid)
yield if block_given?
pid
rescue Errno::ECHILD
end
+
+ def name_for(pid)
+ process = nil
+ index = nil
+ if @running[pid]
+ process, index = @running[pid]
+ elsif @running_threads[pid]
+ process, index = @running_threads[pid]
+ end
+ name_for_index(process, index)
+ end
+
+ def name_for_index(process, index)
+ [ @names[process], index.to_s ].compact.join(".")
+ end
end
# Need to expose the options to make the process be aware
# of the possible running mode (specially spec mode)
# and where to put the results output
+ require 'timeout'
class CodeBlockProcess < Foreman::Process
attr_reader :options
+
+ def run(options={})
+ env = @options[:env].merge(options[:env] || {})
+ output = options[:output] || $stdout
+ runner = "#{Foreman.runner}".shellescape
+
+ # whitelist the modifiers which manipulate how to the block is started
+ block_modifiers = { }
+ if options[:header]
+ block_modifiers[:waitfor] = options[:header][:waitsfor] || options[:header][:waitfor] || options[:header][:sleep]
+ block_modifiers[:timeout] = options[:header][:timeoutin] || options[:header][:timeout] || options[:header][:timeoutafter]
+ end
+
+ pid = nil
+ thread = nil
+ process = nil
+
+ if block_modifiers and (block_modifiers[:waitfor] || block_modifiers[:timeout])
+ thread = Thread.new do
+ waitfor = block_modifiers[:waitfor].to_i
+ timeout = block_modifiers[:timeout].to_i
+ process = proc do
+ sleep waitfor if waitfor > 0
+ wrapped_command = "exec #{runner} -d '#{cwd}' -p -- #{command}"
+ pid = Process.spawn env, wrapped_command, :out => output, :err => output
+ end
+ timeout > 0 ? Timeout::timeout(timeout, &process) : process.call
+ end
+ else
+ if Foreman.windows?
+ Dir.chdir(cwd) do
+ pid = Process.spawn env, expanded_command(env), :out => output, :err => output
+ end
+ else
+ wrapped_command = "exec #{runner} -d '#{cwd}' -p -- #{command}"
+ pid = Process.spawn env, wrapped_command, :out => output, :err => output
+ end
+ end
+
+ # In case of thread, pid will be nil
+ return pid, thread
+ end
end
end