lib/nuri/master.rb in nuri-0.5.3 vs lib/nuri/master.rb in nuri-0.5.4

- old
+ new

@@ -92,10 +92,11 @@ (agents.keys - vms.keys).each do |name| Thread.new { node_name = name node_state = get_node_state(node_name, !!p[:push_modules]) mutex.synchronize { state[node_name] = node_state } + state[node_name] = node_state } end total = agents.keys.length - vms.keys.length # wait until all threads have finish @@ -108,10 +109,11 @@ exist_vms.each_key { |name| Thread.new { node_name = name node_state = get_node_state(node_name, !!p[:push_modules]) mutex.synchronize { state[node_name] = node_state } + state[node_name] = node_state } } # get state of non-existing VM nodes not_exist_vms.each { |name,model| @@ -122,32 +124,39 @@ wait? { (state.length >= agents.length) } # update <vm>.in_cloud value update_cloud_vm_relations(state, vms) + agents.merge!(exist_vms) + push_agents_list(agents, {:reset => true}) + state end protected def format_benchmark(benchmark) - "cpu-time: user=#{benchmark.cutime.round(2)} sys=#{benchmark.cstime.round(2)} total=#{benchmark.total.round(2)}" + user = (benchmark.utime + benchmark.cutime).round(3) + system = (benchmark.stime + benchmark.cstime).round(3) + real = benchmark.real.round(3) + "benchmark: user=#{user} sys=#{system} real=#{real}" end - def create_plan_task(p={}) + def create_plan_task(opts={}) task = get_schemata print "Getting current state " - puts (p[:color] ? "[Wait]".yellow : "[Wait]") + puts (opts[:color] ? "[Wait]".yellow : "[Wait]") - b = Benchmark.measure do - task['initial'] = to_state('initial', get_state(p)) + benchmark = Benchmark.measure do + task['initial'] = to_state('initial', get_state(opts)) end + #puts YAML.dump(task['initial']) - print "Getting current state " - print (p[:color] ? "[OK]".green : "[OK]") - puts " " + format_benchmark(b) + puts "Getting current state " + (opts[:color] ? "[OK] ".green : "[OK] ") + + format_benchmark(benchmark) + task['initial'].accept(FinalAttributeRemover) task['initial'].accept(Sfp::Visitor::SfpGenerator.new(task)) f1 = Sfp::Helper::SfpFlatten.new task['initial'].accept(f1) # modify condition of procedures of each VM's component @@ -160,30 +169,30 @@ goal.accept(FinalAttributeRemover) goal.accept(goalgen) task['goal'] = goalgen.results # find dead-node, remove from the task, print WARNING to the console - dead_nodes = task['initial'].select { |k,v| v.is_a?(Sfp::Unknown) } - dead_nodes.each_key { |name| - task['initial'].delete(name) - task['goal'].keep_if { |k,v| !(k =~ /(\$\.#{name}\.|\$\.#{name}$)/) } - print (p[:color] ? "[Warn]".red : "[Warn]") - puts " Removing node #{name} from the task." - } + init = task['initial'] + init.keys.each do |name| + next if not init[name].is_a?(Sfp::Unknown) + init.delete(name) + task['goal'].keep_if { |var,val| !(var =~ /(\$\.#{name}\.|\$\.#{name}$)/) } + puts (opts[:color] ? "[Warn]".red : "[Warn]") + " Remove node #{name} from the planning task." + end # print the status of goal state puts "Goal state:" - goalgen.results.each { |k,v| - next if k[0,1] == '_' + goalgen.results.each { |var,val| + next if var[0] == '_' - print " #{k}: " - value = Sfp::Helper::Sfp2Ruby.val(v['_value']).to_s - print (p[:color] ? value.green : value) + " " + print " #{var}: " + value = Sfp::Helper::Sfp2Ruby.val(val['_value']).to_s + print (opts[:color] ? value.green : value) - if f1.results.has_key?(k) and f1.results[k] != v['_value'] - value = Sfp::Helper::Sfp2Ruby.val(f1.results[k]).to_s - print (p[:color] ? value.red : value) + if f1.results.has_key?(var) and f1.results[var] != val['_value'] + value = Sfp::Helper::Sfp2Ruby.val(f1.results[var]).to_s + print " ( " + (opts[:color] ? value.red : value) + " )" end puts "" } @@ -230,16 +239,17 @@ end map end def get_not_exist_vm_state(model) - s = {'state' => Sfp::Helper.deep_clone(model)} + state = Sfp::Helper.deep_clone(model) + s = {'state' => state} s.accept(VisitorNotExistNodeState) s.accept(ParentEliminator) - s['state']['created'] = false - s['state']['in_cloud'] = {'_context' => 'null', '_value' => CloudSchema} - s['state'] + state['created'] = false + state['in_cloud'] = {'_context' => 'null', '_value' => CloudSchema} + state end def update_cloud_vm_relations(state, vms) @cloudfinder.clouds.each do |cloud| proxy = state.at?(cloud) @@ -248,10 +258,11 @@ # to associated VM proxy['vms'].each do |name,data| next if not vms.has_key?(name) if state[name].is_a?(Hash) state[name]['in_cloud'] = cloud + state[name]['created'] = true elsif state[name].is_a?(Sfp::Unknown) state[name] = get_dead_vm_state(vms[name], cloud) end end end @@ -308,30 +319,37 @@ end end true end - def push_agents_list + def push_agents_list(agents=nil, options={}) begin - agents = {} + agents ||= get_agents + data = {} # generate agents list get_agents.each do |name, model| next if not model['sfpAddress'].is_a?(String) address = model['sfpAddress'].to_s.strip port = model['sfpPort'].to_s.strip.to_i next if address == '' or port <= 0 - agents[name] = {:sfpAddress => address, :sfpPort => port} + data[name] = {:sfpAddress => address, :sfpPort => port} end - data = {'agents' => JSON.generate(agents)} + json = {'agents' => JSON.generate(data)} # send the list to all agents - agents.each do |name, agent| - code, _ = put_data(agent[:sfpAddress], agent[:sfpPort], '/agents', data, 5, 20) + data.each do |name, agent| + if options[:reset] + delete_data(agent[:sfpAddress], agent[:sfpPort], '/agents') + end + + # update current agents list + code, _ = put_data(agent[:sfpAddress], agent[:sfpPort], '/agents', json, 5, 20) raise Exception, "Push agents list to #{agent[:sfpAddress]}:#{agent[:sfpPort]} [Failed]" if code.to_i != 200 end return true rescue Exception => exp + #$stderr.puts "#{exp}\n#{exp.backtrace.join("\n")}" end false end ############### @@ -357,13 +375,15 @@ {:agent => agent_model}.accept(finder) schemata = finder.schemata.uniq.map { |x| x.sub(/^\$\./, '').downcase } begin # get modules list - code, body = get_data(address, port, '/modules') + code, body = get_data(address, port, '/modules', DefaultHTTPOpenTimeout, 5) raise Exception, "Unable to get modules list from #{name}" if code.to_i != 200 + #puts "#{name}'s modules: #{body}" + modules = JSON[body] tobe_installed_modules = [] schemata.each do |name| module_dir = "#{@modules_dir}/#{name}" if File.exist?(module_dir) and @@ -374,11 +394,13 @@ return true if tobe_installed_modules.length <= 0 ### install new modules and replace old ones list = tobe_installed_modules.join(" ") - output = JSON.parse(`cd #{@modules_dir}; #{InstallModule} #{address}:#{port} #{list}`) + cmd = "#{InstallModule} #{address}:#{port} #{list}" + #puts cmd + output = JSON.parse(`cd #{@modules_dir}; #{cmd}`) if output['installed_modules'].length > 0 puts ("Push modules: " + output['installed_modules'].join(" ") + " to agent #{name} [OK]").green end if output['missing_modules'].length > 0 puts ("Missing modules: " + output['missing_modules'].join(" ") + ".").red @@ -433,11 +455,11 @@ port = model[name]['sfpPort'].to_s.strip if address != '' and port != '' model = Sfp::Helper.deep_clone(model) model.accept(ParentEliminator) data = {'model' => JSON.generate(model)} - code, _ = put_data(address, port, '/model', data) + code, _ = put_data(address, port, '/model', data, DefaultHTTPOpenTimeout, 5) return (code.to_i == 200) end false end @@ -446,11 +468,11 @@ def http_get_agent_state(name, model) return nil if !model[name].is_a?(Hash) or !model[name]['sfpAddress'].is_a?(String) address = model[name]['sfpAddress'].to_s.strip port = model[name]['sfpPort'].to_s.strip if address != '' and port != '' - code, body = get_data(address, port, '/sfpstate') + code, body = get_data(address, port, '/sfpstate', DefaultHTTPOpenTimeout, 20) if code.to_i == 200 and body.length >= 2 state = JSON[body] return state['state'] if state.is_a?(Hash) return state end @@ -462,16 +484,13 @@ @model.select { |k,v| k[0,1] != '_' and v.is_a?(Hash) and v['_context'] == 'class' } end - def get_agents - Nuri::Master.agents(@model) - end - - def self.agents(sfp) - sfp.select { |k,v| !(k[0] == '_' or not v.is_a?(Hash) or + def get_agents(model=nil) + model ||= @model + model.select { |k,v| !(k[0] == '_' or not v.is_a?(Hash) or v['_context'] != 'object' or v['_classes'].index(AgentSchema).nil?) } end def get_vms @@ -495,39 +514,45 @@ SfpUndefinedNumber = Sfp::Undefined.create('$.Number') SfpUndefinedBoolean = Sfp::Undefined.create('$.Boolean') VisitorNotExistNodeState = Object.new def VisitorNotExistNodeState.visit(name, value, parent) + parent.delete(name) if name == '_parent' return false if name[0,1] == '_' - if not value.is_a?(Hash) + if value.is_a?(Hash) + case value['_context'] + when 'null', 'any_value' + parent[name] = Sfp::Undefined.create(value['_isa']) + when 'object', 'procedure' + # do nothing + else + parent.delete(name) + end + else if value.is_a?(String) if value.isref ref_value = parent.at?(value) # TODO - need to handle a reference to a primitive value if ref_value.is_a?(Hash) and (ref_value.isobject or ref_value.isnull) parent[name] = Sfp::Undefined.create(ref_value['_isa']) elsif ref_value.is_a?(Sfp::Undefined) or ref_value.is_a?(Sfp::Unknown) parent[name] = ref_value else - puts "[WARN] Sfp::Undefined => #{parent.ref.push(name)}: #{ref_value.class.name}" + #puts "[WARN] Sfp::Undefined => #{parent.ref.push(name)}: #{ref_value.class.name}" parent[name] = SfpUndefined end else parent[name] = SfpUndefinedString end elsif value.is_a?(Fixnum) or value.is_a?(Float) parent[name] = SfpUndefinedNumber elsif value.is_a?(TrueClass) or value.is_a?(FalseClass) parent[name] = SfpUndefinedBoolean else - puts "[WARN] Sfp::Undefined => " + parent.ref.push(name) + ": " + value.class.name + #puts "[WARN] Sfp::Undefined => " + parent.ref.push(name) + ": " + value.class.name parent[name] = SfpUndefined end - elsif value['_context'] == 'null' or value['_context'] == 'any_value' - parent[name] = Sfp::Undefined.create(value['_isa']) - elsif value['_context'] != 'object' - parent.delete(name) end true end VisitorDeadAgentNodeState = Object.new @@ -541,22 +566,22 @@ if ref_value.is_a?(Hash) and (ref_value.isobject or ref_value.isnull) parent[name] = Sfp::Unknown.create(ref_value['_isa']) elsif ref_value.is_a?(Sfp::Unknown) or ref_value.is_a?(Sfp::Unknown) parent[name] = ref_value else - puts "[WARN] Sfp::Unknown => #{parent.ref.push(name)}: #{ref_value.class.name}" + #puts "[WARN] Sfp::Unknown => #{parent.ref.push(name)}: #{ref_value.class.name}" parent[name] = SfpUnknown end else parent[name] = Sfp::Unknown.create('$.String') end elsif value.is_a?(Fixnum) or value.is_a?(Float) parent[name] = Sfp::Unknown.create('$.Number') elsif value.is_a?(TrueClass) or value.is_a?(FalseClass) parent[name] = Sfp::Unknown.create('$.Boolean') else - puts "[WARN] Sfp::Unknown => " + parent.ref.push(name) + ": " + value.class.name + #puts "[WARN] Sfp::Unknown => " + parent.ref.push(name) + ": " + value.class.name parent[name] = SfpUnknown end elsif value['_context'] == 'null' or value['_context'] == 'any_value' parent[name] = Sfp::Unknown.create(value['_isa']) elsif value['_context'] != 'object' @@ -626,11 +651,14 @@ # because these will be assigned dynamically var = parser.variables[k] next if var.nil? # the variable is not found if v.is_a?(Hash) - val = parser.types[v['_value']][0] if v['_context'] == 'null' - raise Exception, "Not implemented yet." # this may arise on Set values + if v['_context'] == 'null' + val = parser.types[v['_value']][0] + else + raise Exception, "Not implemented yet." # this may arise on Set values + end else val = v end # add the value to variable's values