lib/nuri/orchestrator.rb in nuri-0.5.2 vs lib/nuri/orchestrator.rb in nuri-0.5.3
- old
+ new
@@ -1,33 +1,35 @@
require 'thread'
module Nuri::Orchestrator
include Nuri::Helper
- def execute_plan(p={})
- raise Exception, "Plan file is not exist!" if not File.exist?(p[:execute].to_s) and !p[:plan]
- raise Exception, "Plan is not exist (parameter :plan must be given)!" if !p[:plan]
+ def execute_plan(options={})
+ raise Exception, "Plan file is not exist!" if not File.exist?(options[:execute].to_s) and !options[:plan]
+ raise Exception, "Plan is not exist (parameter :plan must be given)!" if !options[:plan]
push_agents_list
success = false
benchmark = Benchmark.measure {
- plan = (p[:plan] ? p[:plan] : JSON[File.read(p[:execute])])
- raise Exception, "Invalid plan!" if plan['workflow'].nil?
+ plan = (options[:plan] ? options[:plan] : JSON[File.read(options[:execute])])
+ raise Exception, "No plan." if plan['workflow'].nil?
if plan.is_a?(Hash) and plan['type'] == 'sequential'
- success = execute_sequential_plan(plan, p)
+ success = execute_sequential_plan(plan, options)
elsif plan.is_a?(Hash) and plan['type'] == 'parallel'
- success = execute_parallel_plan(plan, p)
+ success = execute_parallel_plan(plan, options)
+ else
+ raise Exception, "Invalid plan."
end
}
- puts "Execution time (s): #{benchmark}"
+ puts "Execution " + format_benchmark(benchmark)
success
end
protected
- def send_action_data(action, address, port)
+ def send_action_data(action, address, port, options={})
action['parameters'].each do |name,value|
next if !value.is_a?(String) or !value.isref or value.split('.').length > 2
_, target_name = value.split('.', 2)
data = {'model' => Sfp::Helper.deep_clone(@model[target_name])}
data.accept(Sfp::Visitor::ParentEliminator.new)
@@ -36,19 +38,20 @@
begin
code, _ = put_data(address, port, "/model/cache/#{target_name}", data)
rescue
end
if code != '200'
- $stderr.puts "Sending action data of #{value} to #{address}:#{port} [Failed]".red
+ $stderr.print "Sending action data of #{value} to #{address}:#{port} "
+ $stderr.puts (options[:color] ? "[Failed]".red : "[Failed]")
return false
end
end
true
end
- def execute_action(action)
+ def execute_action(action, options={})
_, agent_name, _ = action['name'].split('.', 3)
agents = get_agents
return false if !agents[agent_name]['sfpAddress'].is_a?(String)
@@ -56,11 +59,11 @@
port = agents[agent_name]['sfpPort'].to_s
raise Exception, "Cannot find address:port of agent #{agent_name}" if
address.length <= 0 or port.length <= 0
- send_action_data(action, address, port)
+ send_action_data(action, address, port, options)
data = {'action' => JSON.generate(action)}
code, _ = post_data(address, port, '/execute', data)
if code.to_i == 200
# if the action is "create_vm" or "delete_vm", then
@@ -95,26 +98,26 @@
push_agents_list
}
end
- def execute_sequential_plan(plan, p)
+ def execute_sequential_plan(plan, options)
begin
index = 1
plan['workflow'].each do |action|
- print "#{index}. #{action['name']} #{JSON.generate(action['parameters'])}... "
- if execute_action(action)
- puts "[OK]".green
+ puts "#{index}. #{action['name']} #{JSON.generate(action['parameters'])} " + (options[:color] ? "[Wait]".yellow : "[Wait]")
+ if execute_action(action, options)
+ puts "#{index}. #{action['name']} #{JSON.generate(action['parameters'])} " + (options[:color] ? "[OK]".green : "[OK]")
else
- puts "[Failed]".red
+ puts "#{index}. #{action['name']} #{JSON.generate(action['parameters'])} " + (options[:color] ? "[Failed]".red : "[Failed]")
return false
end
index += 1
end
return true
rescue Exception => e
- puts "#{e}\n#{e.backtrace.join("\n")}".red
+ $stderr.puts "#{e}\n#{e.backtrace.join("\n")}"
end
false
end
def execute_parallel_plan(plan, options)
@@ -138,42 +141,43 @@
id = 0
@mutex.synchronize { @thread_id = id = @thread_id + 1 }
id
end
- def assign_action_with_id(id)
+ def assign_action_with_id(id, options={})
thread_id = next_thread_id
action = @actions[id]
action[:executor] = thread_id
- self.thread_execute_action(thread_id, action)
+ self.thread_execute_action(thread_id, action, options)
end
- def thread_execute_action(tid, action)
+ def thread_execute_action(tid, action, options={})
t = Thread.new {
# Register the thread
@mutex.synchronize { @threads << tid }
while not @failed and not action[:executed]
# Try to execute the action
- puts "Executing #{action[:string]} - thread ##{tid} [WAIT]".yellow
+ puts "Executing #{action[:string]} - thread ##{tid} " + (options[:color] ? "[Wait]".yellow : "[Wait]")
success = false
1.upto(@retries) do |i|
begin
- success = execute_action(action)
+ success = execute_action(action, options)
rescue Exception => exp
- puts "Executing(#{i}) #{action[:string]} - thread ##{tid} [FAILED]\n#{exp}\n#{exp.backtrace.join("\n")}"
+ puts "Executing(#{i}) #{action[:string]} - thread ##{tid} " + (options[:color] ? "[Failed]".red : "[Failed]")
+ puts "#{exp}\n#{exp.backtrace.join("\n")}"
end
break if success
end
# Check if execution failed
if success
# Execution was success
- puts "Executing #{action[:string]} - thread ##{tid} [OK]".green
+ puts "Executing #{action[:string]} - thread ##{tid} " + (options[:color] ? "[OK]".green : "[OK]")
next_actions = []
@mutex.synchronize {
- action[:executed] = true # set executed
+ action[:executed] = true # set executed flag
# select next action to be executed from successor actions list
# select a successor action that has not been assigned to any thread yet
if action['successors'].length > 0
action['successors'].each { |id|
if @actions[id][:executor].nil?
@@ -194,18 +198,18 @@
# execute the first next actions to this thread
action = @actions[next_actions[0]]
if next_actions.length > 1
# execute other next actions to other threads
for i in 1..(next_actions.length-1)
- assign_action_with_id(next_actions[i])
+ assign_action_with_id(next_actions[i], options)
end
end
end
else
# Execution was failed
- puts "Executing #{action[:string]} - thread ##{tid} [FAILED]".red
+ puts "Executing #{action[:string]} - thread ##{tid} " + (options[:color] ? "[Failed]".red : "[Failed]")
@mutex.synchronize {
@failed = true # set global flag to stop the execution
@actions_failed << action
}
end
@@ -213,10 +217,11 @@
@mutex.synchronize { @threads.delete(tid) }
}
end
- plan['init'].each { |id| assign_action_with_id(id) }
+ ### execute actions without predecessor
+ plan['init'].each { |id| assign_action_with_id(id, options) }
begin
sleep 1
end while @threads.length > 0
(not @failed)
end