lib/nuri/master.rb in nuri-0.5.3 vs lib/nuri/master.rb in nuri-0.5.4
- old
+ new
@@ -92,10 +92,11 @@
(agents.keys - vms.keys).each do |name|
Thread.new {
node_name = name
node_state = get_node_state(node_name, !!p[:push_modules])
mutex.synchronize { state[node_name] = node_state }
+ state[node_name] = node_state
}
end
total = agents.keys.length - vms.keys.length
# wait until all threads have finish
@@ -108,10 +109,11 @@
exist_vms.each_key { |name|
Thread.new {
node_name = name
node_state = get_node_state(node_name, !!p[:push_modules])
mutex.synchronize { state[node_name] = node_state }
+ state[node_name] = node_state
}
}
# get state of non-existing VM nodes
not_exist_vms.each { |name,model|
@@ -122,32 +124,39 @@
wait? { (state.length >= agents.length) }
# update <vm>.in_cloud value
update_cloud_vm_relations(state, vms)
+ agents.merge!(exist_vms)
+ push_agents_list(agents, {:reset => true})
+
state
end
protected
def format_benchmark(benchmark)
- "cpu-time: user=#{benchmark.cutime.round(2)} sys=#{benchmark.cstime.round(2)} total=#{benchmark.total.round(2)}"
+ user = (benchmark.utime + benchmark.cutime).round(3)
+ system = (benchmark.stime + benchmark.cstime).round(3)
+ real = benchmark.real.round(3)
+ "benchmark: user=#{user} sys=#{system} real=#{real}"
end
- def create_plan_task(p={})
+ def create_plan_task(opts={})
task = get_schemata
print "Getting current state "
- puts (p[:color] ? "[Wait]".yellow : "[Wait]")
+ puts (opts[:color] ? "[Wait]".yellow : "[Wait]")
- b = Benchmark.measure do
- task['initial'] = to_state('initial', get_state(p))
+ benchmark = Benchmark.measure do
+ task['initial'] = to_state('initial', get_state(opts))
end
+ #puts YAML.dump(task['initial'])
- print "Getting current state "
- print (p[:color] ? "[OK]".green : "[OK]")
- puts " " + format_benchmark(b)
+ puts "Getting current state " + (opts[:color] ? "[OK] ".green : "[OK] ") +
+ format_benchmark(benchmark)
+ task['initial'].accept(FinalAttributeRemover)
task['initial'].accept(Sfp::Visitor::SfpGenerator.new(task))
f1 = Sfp::Helper::SfpFlatten.new
task['initial'].accept(f1)
# modify condition of procedures of each VM's component
@@ -160,30 +169,30 @@
goal.accept(FinalAttributeRemover)
goal.accept(goalgen)
task['goal'] = goalgen.results
# find dead-node, remove from the task, print WARNING to the console
- dead_nodes = task['initial'].select { |k,v| v.is_a?(Sfp::Unknown) }
- dead_nodes.each_key { |name|
- task['initial'].delete(name)
- task['goal'].keep_if { |k,v| !(k =~ /(\$\.#{name}\.|\$\.#{name}$)/) }
- print (p[:color] ? "[Warn]".red : "[Warn]")
- puts " Removing node #{name} from the task."
- }
+ init = task['initial']
+ init.keys.each do |name|
+ next if not init[name].is_a?(Sfp::Unknown)
+ init.delete(name)
+ task['goal'].keep_if { |var,val| !(var =~ /(\$\.#{name}\.|\$\.#{name}$)/) }
+ puts (opts[:color] ? "[Warn]".red : "[Warn]") + " Remove node #{name} from the planning task."
+ end
# print the status of goal state
puts "Goal state:"
- goalgen.results.each { |k,v|
- next if k[0,1] == '_'
+ goalgen.results.each { |var,val|
+ next if var[0] == '_'
- print " #{k}: "
- value = Sfp::Helper::Sfp2Ruby.val(v['_value']).to_s
- print (p[:color] ? value.green : value) + " "
+ print " #{var}: "
+ value = Sfp::Helper::Sfp2Ruby.val(val['_value']).to_s
+ print (opts[:color] ? value.green : value)
- if f1.results.has_key?(k) and f1.results[k] != v['_value']
- value = Sfp::Helper::Sfp2Ruby.val(f1.results[k]).to_s
- print (p[:color] ? value.red : value)
+ if f1.results.has_key?(var) and f1.results[var] != val['_value']
+ value = Sfp::Helper::Sfp2Ruby.val(f1.results[var]).to_s
+ print " ( " + (opts[:color] ? value.red : value) + " )"
end
puts ""
}
@@ -230,16 +239,17 @@
end
map
end
def get_not_exist_vm_state(model)
- s = {'state' => Sfp::Helper.deep_clone(model)}
+ state = Sfp::Helper.deep_clone(model)
+ s = {'state' => state}
s.accept(VisitorNotExistNodeState)
s.accept(ParentEliminator)
- s['state']['created'] = false
- s['state']['in_cloud'] = {'_context' => 'null', '_value' => CloudSchema}
- s['state']
+ state['created'] = false
+ state['in_cloud'] = {'_context' => 'null', '_value' => CloudSchema}
+ state
end
def update_cloud_vm_relations(state, vms)
@cloudfinder.clouds.each do |cloud|
proxy = state.at?(cloud)
@@ -248,10 +258,11 @@
# to associated VM
proxy['vms'].each do |name,data|
next if not vms.has_key?(name)
if state[name].is_a?(Hash)
state[name]['in_cloud'] = cloud
+ state[name]['created'] = true
elsif state[name].is_a?(Sfp::Unknown)
state[name] = get_dead_vm_state(vms[name], cloud)
end
end
end
@@ -308,30 +319,37 @@
end
end
true
end
- def push_agents_list
+ def push_agents_list(agents=nil, options={})
begin
- agents = {}
+ agents ||= get_agents
+ data = {}
# generate agents list
get_agents.each do |name, model|
next if not model['sfpAddress'].is_a?(String)
address = model['sfpAddress'].to_s.strip
port = model['sfpPort'].to_s.strip.to_i
next if address == '' or port <= 0
- agents[name] = {:sfpAddress => address, :sfpPort => port}
+ data[name] = {:sfpAddress => address, :sfpPort => port}
end
- data = {'agents' => JSON.generate(agents)}
+ json = {'agents' => JSON.generate(data)}
# send the list to all agents
- agents.each do |name, agent|
- code, _ = put_data(agent[:sfpAddress], agent[:sfpPort], '/agents', data, 5, 20)
+ data.each do |name, agent|
+ if options[:reset]
+ delete_data(agent[:sfpAddress], agent[:sfpPort], '/agents')
+ end
+
+ # update current agents list
+ code, _ = put_data(agent[:sfpAddress], agent[:sfpPort], '/agents', json, 5, 20)
raise Exception, "Push agents list to #{agent[:sfpAddress]}:#{agent[:sfpPort]} [Failed]" if code.to_i != 200
end
return true
rescue Exception => exp
+ #$stderr.puts "#{exp}\n#{exp.backtrace.join("\n")}"
end
false
end
###############
@@ -357,13 +375,15 @@
{:agent => agent_model}.accept(finder)
schemata = finder.schemata.uniq.map { |x| x.sub(/^\$\./, '').downcase }
begin
# get modules list
- code, body = get_data(address, port, '/modules')
+ code, body = get_data(address, port, '/modules', DefaultHTTPOpenTimeout, 5)
raise Exception, "Unable to get modules list from #{name}" if code.to_i != 200
+ #puts "#{name}'s modules: #{body}"
+
modules = JSON[body]
tobe_installed_modules = []
schemata.each do |name|
module_dir = "#{@modules_dir}/#{name}"
if File.exist?(module_dir) and
@@ -374,11 +394,13 @@
return true if tobe_installed_modules.length <= 0
### install new modules and replace old ones
list = tobe_installed_modules.join(" ")
- output = JSON.parse(`cd #{@modules_dir}; #{InstallModule} #{address}:#{port} #{list}`)
+ cmd = "#{InstallModule} #{address}:#{port} #{list}"
+ #puts cmd
+ output = JSON.parse(`cd #{@modules_dir}; #{cmd}`)
if output['installed_modules'].length > 0
puts ("Push modules: " + output['installed_modules'].join(" ") + " to agent #{name} [OK]").green
end
if output['missing_modules'].length > 0
puts ("Missing modules: " + output['missing_modules'].join(" ") + ".").red
@@ -433,11 +455,11 @@
port = model[name]['sfpPort'].to_s.strip
if address != '' and port != ''
model = Sfp::Helper.deep_clone(model)
model.accept(ParentEliminator)
data = {'model' => JSON.generate(model)}
- code, _ = put_data(address, port, '/model', data)
+ code, _ = put_data(address, port, '/model', data, DefaultHTTPOpenTimeout, 5)
return (code.to_i == 200)
end
false
end
@@ -446,11 +468,11 @@
def http_get_agent_state(name, model)
return nil if !model[name].is_a?(Hash) or !model[name]['sfpAddress'].is_a?(String)
address = model[name]['sfpAddress'].to_s.strip
port = model[name]['sfpPort'].to_s.strip
if address != '' and port != ''
- code, body = get_data(address, port, '/sfpstate')
+ code, body = get_data(address, port, '/sfpstate', DefaultHTTPOpenTimeout, 20)
if code.to_i == 200 and body.length >= 2
state = JSON[body]
return state['state'] if state.is_a?(Hash)
return state
end
@@ -462,16 +484,13 @@
@model.select { |k,v| k[0,1] != '_' and v.is_a?(Hash) and
v['_context'] == 'class'
}
end
- def get_agents
- Nuri::Master.agents(@model)
- end
-
- def self.agents(sfp)
- sfp.select { |k,v| !(k[0] == '_' or not v.is_a?(Hash) or
+ def get_agents(model=nil)
+ model ||= @model
+ model.select { |k,v| !(k[0] == '_' or not v.is_a?(Hash) or
v['_context'] != 'object' or v['_classes'].index(AgentSchema).nil?)
}
end
def get_vms
@@ -495,39 +514,45 @@
SfpUndefinedNumber = Sfp::Undefined.create('$.Number')
SfpUndefinedBoolean = Sfp::Undefined.create('$.Boolean')
VisitorNotExistNodeState = Object.new
def VisitorNotExistNodeState.visit(name, value, parent)
+ parent.delete(name) if name == '_parent'
return false if name[0,1] == '_'
- if not value.is_a?(Hash)
+ if value.is_a?(Hash)
+ case value['_context']
+ when 'null', 'any_value'
+ parent[name] = Sfp::Undefined.create(value['_isa'])
+ when 'object', 'procedure'
+ # do nothing
+ else
+ parent.delete(name)
+ end
+ else
if value.is_a?(String)
if value.isref
ref_value = parent.at?(value)
# TODO - need to handle a reference to a primitive value
if ref_value.is_a?(Hash) and (ref_value.isobject or ref_value.isnull)
parent[name] = Sfp::Undefined.create(ref_value['_isa'])
elsif ref_value.is_a?(Sfp::Undefined) or ref_value.is_a?(Sfp::Unknown)
parent[name] = ref_value
else
- puts "[WARN] Sfp::Undefined => #{parent.ref.push(name)}: #{ref_value.class.name}"
+ #puts "[WARN] Sfp::Undefined => #{parent.ref.push(name)}: #{ref_value.class.name}"
parent[name] = SfpUndefined
end
else
parent[name] = SfpUndefinedString
end
elsif value.is_a?(Fixnum) or value.is_a?(Float)
parent[name] = SfpUndefinedNumber
elsif value.is_a?(TrueClass) or value.is_a?(FalseClass)
parent[name] = SfpUndefinedBoolean
else
- puts "[WARN] Sfp::Undefined => " + parent.ref.push(name) + ": " + value.class.name
+ #puts "[WARN] Sfp::Undefined => " + parent.ref.push(name) + ": " + value.class.name
parent[name] = SfpUndefined
end
- elsif value['_context'] == 'null' or value['_context'] == 'any_value'
- parent[name] = Sfp::Undefined.create(value['_isa'])
- elsif value['_context'] != 'object'
- parent.delete(name)
end
true
end
VisitorDeadAgentNodeState = Object.new
@@ -541,22 +566,22 @@
if ref_value.is_a?(Hash) and (ref_value.isobject or ref_value.isnull)
parent[name] = Sfp::Unknown.create(ref_value['_isa'])
elsif ref_value.is_a?(Sfp::Unknown) or ref_value.is_a?(Sfp::Unknown)
parent[name] = ref_value
else
- puts "[WARN] Sfp::Unknown => #{parent.ref.push(name)}: #{ref_value.class.name}"
+ #puts "[WARN] Sfp::Unknown => #{parent.ref.push(name)}: #{ref_value.class.name}"
parent[name] = SfpUnknown
end
else
parent[name] = Sfp::Unknown.create('$.String')
end
elsif value.is_a?(Fixnum) or value.is_a?(Float)
parent[name] = Sfp::Unknown.create('$.Number')
elsif value.is_a?(TrueClass) or value.is_a?(FalseClass)
parent[name] = Sfp::Unknown.create('$.Boolean')
else
- puts "[WARN] Sfp::Unknown => " + parent.ref.push(name) + ": " + value.class.name
+ #puts "[WARN] Sfp::Unknown => " + parent.ref.push(name) + ": " + value.class.name
parent[name] = SfpUnknown
end
elsif value['_context'] == 'null' or value['_context'] == 'any_value'
parent[name] = Sfp::Unknown.create(value['_isa'])
elsif value['_context'] != 'object'
@@ -626,11 +651,14 @@
# because these will be assigned dynamically
var = parser.variables[k]
next if var.nil? # the variable is not found
if v.is_a?(Hash)
- val = parser.types[v['_value']][0] if v['_context'] == 'null'
- raise Exception, "Not implemented yet." # this may arise on Set values
+ if v['_context'] == 'null'
+ val = parser.types[v['_value']][0]
+ else
+ raise Exception, "Not implemented yet." # this may arise on Set values
+ end
else
val = v
end
# add the value to variable's values