# -*- coding: utf-8 -*- require 'spec_helper' require 'tengine/rspec' require 'net/ssh' describe 'job_control_driver' do include Tengine::RSpec::Extension target_dsl File.expand_path("../../../../lib/tengine/job/drivers/job_control_driver.rb", File.dirname(__FILE__)) driver :job_control_driver context "rjn0001" do before do Tengine::Job::Vertex.delete_all builder = Rjn0001SimpleJobnetBuilder.new @jobnet = builder.create_actual @ctx = builder.context @execution = Tengine::Job::Execution.create!({ :root_jobnet_id => @jobnet.id, }) end context "ジョブの起動イベントを受け取ったら" do it "通常の場合" do @jobnet.phase_key = :starting @ctx.edge(:e1).phase_key = :transmitting @ctx.vertex(:j11).phase_key = :ready @jobnet.save! @jobnet.reload tengine.should_not_fire mock_ssh = mock(:ssh) mock_channel = mock(:channel) Net::SSH.should_receive(:start). with("localhost", an_instance_of(Tengine::Resource::Credential), an_instance_of(Hash)).and_yield(mock_ssh) mock_ssh.should_receive(:open_channel).and_yield(mock_channel) mock_channel.should_receive(:exec) do |*args| args.length.should == 1 # args.first.should =~ %r<source \/etc\/profile && export MM_ACTUAL_JOB_ID=[0-9a-f]{24} MM_ACTUAL_JOB_ANCESTOR_IDS=\\"[0-9a-f]{24}\\" MM_FULL_ACTUAL_JOB_ANCESTOR_IDS=\\"[0-9a-f]{24}\\" MM_ACTUAL_JOB_NAME_PATH=\\"/rjn0001/j11\\" MM_ACTUAL_JOB_SECURITY_TOKEN= MM_SCHEDULE_ID=[0-9a-f]{24} MM_SCHEDULE_ESTIMATED_TIME= MM_TEMPLATE_JOB_ID=[0-9a-f]{24} MM_TEMPLATE_JOB_ANCESTOR_IDS=\\"[0-9a-f]{24}\\" && tengine_job_agent_run -- \$HOME/j11\.sh> args.first.should =~ %r<source \/etc\/profile> args.first.should =~ %r<MM_ACTUAL_JOB_ID=[0-9a-f]{24} MM_ACTUAL_JOB_ANCESTOR_IDS=\"[0-9a-f]{24}\" MM_FULL_ACTUAL_JOB_ANCESTOR_IDS=\"[0-9a-f]{24}\" MM_ACTUAL_JOB_NAME_PATH=\"/rjn0001/j11\" MM_ACTUAL_JOB_SECURITY_TOKEN= MM_SCHEDULE_ID=[0-9a-f]{24} MM_SCHEDULE_ESTIMATED_TIME= MM_TEMPLATE_JOB_ID=[0-9a-f]{24} MM_TEMPLATE_JOB_ANCESTOR_IDS=\"[0-9a-f]{24}\"> args.first.should =~ %r<job_test j11> end tengine.receive("start.job.job.tengine", :properties => { :execution_id => @execution.id.to_s, :root_jobnet_id => @jobnet.id.to_s, :root_jobnet_name_path => @jobnet.name_path, :target_jobnet_id => @jobnet.id.to_s, :target_jobnet_name_path => @jobnet.name_path, :target_job_id => @ctx.vertex(:j11).id.to_s, :target_job_name_path => @ctx.vertex(:j11).name_path, }) @jobnet.reload @ctx.edge(:e1).phase_key.should == :transmitted @ctx.edge(:e2).phase_key.should == :active @ctx.vertex(:j11).phase_key.should == :starting end context "starting直前stopによってinitializedになっている場合" do [:starting, :running].each do |root_phase_key| it "ルートが#{root_phase_key}" do @jobnet.phase_key = root_phase_key @ctx[:e1].phase_key = :closing @ctx[:e2].phase_key = :closing @ctx[:e3].phase_key = :closing @ctx[:j11].phase_key = :initialized @jobnet.save! @jobnet.reload tengine.should_fire(:"error.jobnet.job.tengine", { :source_name => @ctx[:root].name_as_resource, :properties=>{ :execution_id => @execution.id.to_s, :root_jobnet_id => @jobnet.id.to_s, :root_jobnet_name_path => @jobnet.name_path, :target_jobnet_id => @jobnet.id.to_s, :target_jobnet_name_path => @jobnet.name_path, } }) tengine.receive("start.job.job.tengine", :properties => { :execution_id => @execution.id.to_s, :root_jobnet_id => @jobnet.id.to_s, :root_jobnet_name_path => @jobnet.name_path, :target_jobnet_id => @jobnet.id.to_s, :target_jobnet_name_path => @jobnet.name_path, :target_job_id => @ctx.vertex(:j11).id.to_s, :target_job_name_path => @ctx.vertex(:j11).name_path, }) @jobnet.reload @ctx.edge(:e1).phase_key.should == :closing @ctx.edge(:e2).phase_key.should == :closed @ctx.edge(:e3).phase_key.should == :closed @ctx.vertex(:j11).phase_key.should == :initialized @jobnet.phase_key.should == :error end end end it "存在しないスクリプトを実行しようとした場合、標準エラー出力にエラーメッセージが返されるので、それを保持する" do @jobnet.phase_key = :starting @ctx.edge(:e1).phase_key = :transmitting @ctx.vertex(:j11).phase_key = :ready @jobnet.save! @jobnet.reload mock_ssh = mock(:ssh) Net::SSH.stub(:start).with(any_args).and_yield(mock_ssh) mock_channel = mock(:channel) mock_ssh.stub(:open_channel).and_yield(mock_channel) mock_channel.stub(:exec).with(any_args).and_yield(mock_channel, true) mock_channel.stub(:on_data) mock_channel.should_receive(:on_extended_data).and_yield(mock_channel, "session", "[Errno::ENOENT] No such file or directory - /home/goku/unexist_script.sh") mock_channel.stub(:on_close) tengine.should_fire(:"error.job.job.tengine", { :source_name => @ctx[:j11].name_as_resource, :properties=>{ :execution_id => @execution.id.to_s, :root_jobnet_id => @jobnet.id.to_s, :root_jobnet_name_path => @jobnet.name_path, :target_jobnet_id => @jobnet.id.to_s, :target_jobnet_name_path => @jobnet.name_path, :target_job_id => @ctx.vertex(:j11).id.to_s, :target_job_name_path => @ctx.vertex(:j11).name_path, :exit_status=>nil, :message=>"Failure to execute /rjn0001/j11 via SSH: [Errno::ENOENT] No such file or directory - /home/goku/unexist_script.sh" } }) tengine.receive("start.job.job.tengine", :properties => { :execution_id => @execution.id.to_s, :root_jobnet_id => @jobnet.id.to_s, :root_jobnet_name_path => @jobnet.name_path, :target_jobnet_id => @jobnet.id.to_s, :target_jobnet_name_path => @jobnet.name_path, :target_job_id => @ctx.vertex(:j11).id.to_s, :target_job_name_path => @ctx.vertex(:j11).name_path, }) @jobnet.reload @ctx.edge(:e1).phase_key.should == :transmitted @ctx.edge(:e2).phase_key.should == :active @ctx.vertex(:j11).tap do |job| job.phase_key.should == :error job.error_messages.should == [ "[Errno::ENOENT] No such file or directory - /home/goku/unexist_script.sh", "Failure to execute /rjn0001/j11 via SSH: [Errno::ENOENT] No such file or directory - /home/goku/unexist_script.sh" ] end @jobnet.phase_key.should == :running end end it "PIDを取得できたら" do @ctx.edge(:e1).phase_key = :transmitted @ctx.edge(:e2).phase_key = :active @ctx.vertex(:j11).phase_key = :starting @jobnet.save! @jobnet.reload tengine.should_not_fire mock_event = mock(:event) @pid = "123" signal = Tengine::Job::Signal.new(mock_event) signal.data = {:executing_pid => @pid} @ctx.vertex(:j11).ack(signal) # このメソッド内ではsaveされないので、ここでreloadもしません。 @ctx.vertex(:j11).executing_pid.should == @pid @ctx.edge(:e1).phase_key.should == :transmitted @ctx.edge(:e2).phase_key.should == :active @ctx.vertex(:j11).phase_key.should == :running end test_error_message1 = "Job process failed. STDOUT and STDERR were redirected to files. You can see them at /home/goku/stdout-1234.log and /home/goku/stderr-1234.log on the server test_server1" { :success => ["0", {}], :error => ["1", { :stdout_log => "/home/goku/stdout-1234.log", :stderr_log => "/home/goku/stderr-1234.log", :message => test_error_message1 }] }.each do |phase_key, (exit_status, extra_props)| it "ジョブ実行#{phase_key}の通知" do test_key = "test_key.finished.process.job.tengine" Tengine::Core::Event.delete_all(:conditions => {:key => test_key}) Tengine::Core::Event.create!(:event_type_name => "job.heartbeat.tengine", :key => test_key) @jobnet.reload j11 = @jobnet.find_descendant_by_name_path("/rjn0001/j11") j11.executing_pid = "123" j11.phase_key = :running j11.previous_edges.length.should == 1 j11.previous_edges.first.phase_key = :transmitted @ctx[:root].save! tengine.should_fire(:"#{phase_key}.job.job.tengine", :source_name => @ctx[:j11].name_as_resource, :properties => { :execution_id => @execution.id.to_s, :root_jobnet_id => @jobnet.id.to_s, :root_jobnet_name_path => @jobnet.name_path, :target_jobnet_id => @jobnet.id.to_s, :target_jobnet_name_path => @jobnet.name_path, :target_job_id => @ctx[:j11].id.to_s, :target_job_name_path => @ctx[:j11].name_path, :exit_status => exit_status }) tengine.receive(:"finished.process.job.tengine", :key => test_key, :source_name => @ctx[:j11].name_as_resource, :properties => { :execution_id => @execution.id.to_s, :root_jobnet_id => @jobnet.id.to_s, :root_jobnet_name_path => @jobnet.name_path, :target_jobnet_id => @jobnet.id.to_s, :target_jobnet_name_path => @jobnet.name_path, :target_job_id => @ctx[:j11].id.to_s, :target_job_name_path => @ctx[:j11].name_path, :exit_status => exit_status }.merge(extra_props)) @jobnet.reload @ctx.edge(:e1).phase_key.should == :transmitted @ctx.edge(:e2).phase_key.should == :active @ctx.vertex(:j11).tap do |j| j.phase_key.should == phase_key j.exit_status.should == exit_status if phase_key == :error j.error_messages.should == [test_error_message1] end end end end it "stuckからのfinished.process.job.tengine" do @jobnet.reload j11 = @jobnet.find_descendant_by_name_path("/rjn0001/j11") j11.phase_key = :stuck j11.previous_edges.first.phase_key = :transmitted @ctx[:root].save! tengine.receive(:"finished.process.job.tengine", :properties => { :execution_id => @execution.id.to_s, :root_jobnet_id => @jobnet.id.to_s, :root_jobnet_name_path => @jobnet.name_path, :target_jobnet_id => @jobnet.id.to_s, :target_jobnet_name_path => @jobnet.name_path, :target_job_id => @ctx[:j11].id.to_s, :target_job_name_path => @ctx[:j11].name_path, :exit_status => 0 }) @jobnet.reload @ctx.vertex(:j11).phase_key.should == :stuck end it "強制停止" do @pid = "123" @jobnet.reload j11 = @jobnet.find_descendant_by_name_path("/rjn0001/j11") j11.executing_pid = @pid j11.phase_key = :running j11.previous_edges.length.should == 1 j11.previous_edges.first.phase_key = :transmitted @ctx[:root].save! tengine.should_not_fire mock_ssh = mock(:ssh) mock_channel = mock(:channel) Net::SSH.should_receive(:start). with("localhost", an_instance_of(Tengine::Resource::Credential), an_instance_of(Hash)).and_yield(mock_ssh) mock_ssh.should_receive(:open_channel).and_yield(mock_channel) mock_channel.should_receive(:exec) do |*args| interval = Tengine::Job::Killing::DEFAULT_KILLING_SIGNAL_INTERVAL args.length.should == 1 args.first.should =~ %r<source \/etc\/profile> args.first.should =~ %r<tengine_job_agent_kill #{@pid} #{interval} KILL$> end tengine.receive(:"stop.job.job.tengine", :source_name => @ctx[:j11].name_as_resource, :properties => { :execution_id => @execution.id.to_s, :root_jobnet_id => @jobnet.id.to_s, :target_jobnet_id => @jobnet.id.to_s, :target_job_id => @ctx[:j11].id.to_s, }) @jobnet.reload @ctx.edge(:e1).phase_key.should == :transmitted @ctx.edge(:e2).phase_key.should == :active @ctx.vertex(:j11).tap do |j| j.phase_key.should == :dying j.exit_status.should == nil end end it "強制停止(ジョブネット)" do @pid11 = "11" @pid12 = "12" @jobnet.reload j11 = @jobnet.find_descendant_by_name_path("/rjn0001/j11") j11.executing_pid = @pid11 j11.phase_key = :success j11.previous_edges.length.should == 1 j11.previous_edges.first.phase_key = :transmitted j12 = @jobnet.find_descendant_by_name_path("/rjn0001/j12") j12.executing_pid = @pid12 j12.phase_key = :running j12.previous_edges.length.should == 1 j12.previous_edges.first.phase_key = :transmitted @ctx[:root].save! # phase_key が success の j11 は fireされない tengine.should_not_fire(:"stop.job.job.tengine") # phase_key が running の j12 は fireされる tengine.should_fire(:"stop.job.job.tengine", :source_name => @ctx[:j12].name_as_resource, :properties => { :stop_reason => "user_stop", :target_jobnet_id => @jobnet.id.to_s, :target_jobnet_name_path => "/rjn0001", :target_job_id => @ctx[:j12].id.to_s, :target_job_name_path => "/rjn0001/j12", :execution_id => @execution.id.to_s, :root_jobnet_id => @jobnet.id.to_s, :root_jobnet_name_path => "/rjn0001", }) # jobnet に対して強制停止された tengine.receive(:"stop.jobnet.job.tengine", :source_name => @jobnet.name_as_resource, :properties => { :stop_reason => "user_stop", :target_jobnet_id => @jobnet.id.to_s, :target_jobnet_name_path => "/rjn0001", :execution_id => @execution.id.to_s, :root_jobnet_id => @jobnet.id.to_s, :root_jobnet_name_path => "/rjn0001", }) end it "強制停止(後続のジョブ)" do @pid11 = "11" @pid12 = "12" @jobnet.reload j11 = @jobnet.find_descendant_by_name_path("/rjn0001/j11") j11.executing_pid = @pid11 j11.phase_key = :success j11.previous_edges.length.should == 1 j11.previous_edges.first.phase_key = :transmitted j12 = @jobnet.find_descendant_by_name_path("/rjn0001/j12") j12.executing_pid = @pid12 j12.phase_key = :running j12.previous_edges.length.should == 1 j12.previous_edges.first.phase_key = :transmitted @ctx[:root].save! mock_ssh = mock(:ssh) mock_channel = mock(:channel) Net::SSH.should_receive(:start). with("localhost", an_instance_of(Tengine::Resource::Credential), an_instance_of(Hash)).and_yield(mock_ssh) mock_ssh.should_receive(:open_channel).and_yield(mock_channel) mock_channel.should_receive(:exec) do |*args| interval = Tengine::Job::Killing::DEFAULT_KILLING_SIGNAL_INTERVAL args.length.should == 1 args.first.should =~ %r<source \/etc\/profile> args.first.should =~ %r<tengine_job_agent_kill #{@pid12} #{interval} KILL$> end # job12 に対して強制停止 tengine.receive(:"stop.job.job.tengine", :source_name => @ctx[:j12].name_as_resource, :properties => { :stop_reason => "user_stop", :target_jobnet_id => @jobnet.id.to_s, :target_jobnet_name_path => "/rjn0001", :target_job_id => @ctx[:j12].id.to_s, :target_job_name_path => "/rjn0001/j12", :execution_id => @execution.id.to_s, :root_jobnet_id => @jobnet.id.to_s, :root_jobnet_name_path => "/rjn0001", }) @jobnet.reload @ctx.edge(:e1).phase_key.should == :transmitted @ctx.edge(:e2).phase_key.should == :transmitted @ctx.edge(:e3).phase_key.should == :active @ctx.vertex(:j11).tap do |j| j.phase_key.should == :success j.stop_reason.should == nil end @ctx.vertex(:j12).tap do |j| j.phase_key.should == :dying j.stop_reason.should == "user_stop" end end if ENV['PASSWORD'] context "実際にSSHで接続", :ssh_actual => true do before do resource_fixture = GokuAtEc2ApNortheast.new credential = resource_fixture.goku_ssh_pw credential.auth_values = {:username => ENV['USER'], :password => ENV['PASSWORD']} credential.save! server = resource_fixture.hadoop_master_node server.local_ipv4 = "127.0.0.1" server.save! end it do tengine.should_not_fire tengine.receive("start.job.job.tengine", :properties => { :execution_id => @execution.id.to_s, :root_jobnet_id => @jobnet.id.to_s, :target_jobnet_id => @jobnet.id.to_s, }) @jobnet.reload j11 = @jobnet.find_descendant_by_name_path("/rjn0001/j11") j11.executing_pid.should_not be_nil j11.exit_status.should == nil j11.phase_key.should == :running j11.previous_edges.length.should == 1 j11.previous_edges.first.phase_key.should == :transmitted end end end end context "再実行" do context "ジョブを再実行" do { false => "後続も実行", true => "スポット再実行" }.each do |spot, caption| context(caption) do before do Tengine::Job::Vertex.delete_all builder = Rjn0001SimpleJobnetBuilder.new @root = builder.create_actual @ctx = builder.context @execution = Tengine::Job::Execution.create!({ :root_jobnet_id => @root.id, :spot => spot, :retry => true, :target_actual_ids => [@ctx[:j11].id.to_s] }) @root.phase_key = :running @ctx[:j11].phase_key = :success @ctx[:j12].phase_key = :error @ctx[:e1].phase_key = :transmitted @ctx[:e2].phase_key = :transmitted @ctx[:e3].phase_key = :active end [:initialized, :success, :error, :stuck].each do |phase_key| it "phase_keyが#{phase_key}ならば再実行できるので、startのイベントを発火する" do @ctx[:j11].phase_key = phase_key @root.save! tengine.should_fire(:"start.job.job.tengine", { :source_name => @ctx[:j11].name_as_resource, :properties=>{ :execution_id => @execution.id.to_s, :root_jobnet_name_path => @root.name_path, :root_jobnet_id => @root.id.to_s, :target_jobnet_name_path => @root.name_path, :target_jobnet_id => @root.id.to_s, :target_job_name_path => @ctx.vertex(:j11).name_path, :target_job_id => @ctx.vertex(:j11).id.to_s, } }) tengine.receive("restart.job.job.tengine", :properties => { :execution_id => @execution.id.to_s, :root_jobnet_id => @root.id.to_s, :root_jobnet_name_path => @root.name_path, :target_jobnet_id => @root.id.to_s, :target_jobnet_name_path => @root.name_path, :target_job_id => @ctx.vertex(:j11).id.to_s, :target_job_name_path => @ctx.vertex(:j11).name_path, }) @root.reload @root.phase_key.should == :running @ctx.edge(:e1).phase_key.should == :transmitted @ctx.vertex(:j11).phase_key.should == :ready if spot @ctx.vertex(:j12).phase_key.should == :error @ctx.edge(:e2).phase_key.should == :transmitted @ctx.edge(:e3).phase_key.should == :active else @ctx.vertex(:j12).phase_key.should == :initialized @ctx.edge(:e2).phase_key.should == :active @ctx.edge(:e3).phase_key.should == :active end end end [:ready, :starting, :running, :dying].each do |phase_key| it "phase_keyが#{phase_key}ならば再実行できず、エラーのイベントを発火する" do @ctx[:j11].phase_key = phase_key @root.save! tengine.should_fire("restart.job.job.tengine.error.tengined").with(any_args) Tengine::Core::Kernel.temp_exception_reporter(:except_test) do tengine.receive("restart.job.job.tengine", :properties => { :execution_id => @execution.id.to_s, :root_jobnet_id => @root.id.to_s, :target_jobnet_id => @root.id.to_s, :target_job_id => @ctx.vertex(:j11).id.to_s, }) end # 再実行に失敗したのでルートジョブネット以下何も状態は変更されません @root.reload @root.phase_key.should == :running @ctx.edge(:e1).phase_key.should == :transmitted @ctx.vertex(:j11).phase_key.should == phase_key end end end end end end context "<BUG>同じジョブネットが複数バージョン存在する際、ジョブ実行時にスクリプトに渡される環境変数の「MM_TEMPLATE_JOB_ID」「MM_TEMPLATE_JOB_ANCESTOR_IDS」が実行しているバージョン以外のものがセットされている" do shared_examples_for "最新のバージョンのルートジョブネットを参照する" do |dsl_version| it do @root.phase_key = :starting @root.element("prev!j11").phase_key = :transmitting @root.element('j11').phase_key = :ready @root.save! @root.reload tengine.should_not_fire mock_ssh = mock(:ssh) mock_channel = mock(:channel) Net::SSH.should_receive(:start). with("localhost", an_instance_of(Tengine::Resource::Credential), an_instance_of(Hash)).and_yield(mock_ssh) mock_ssh.should_receive(:open_channel).and_yield(mock_channel) mock_channel.should_receive(:exec) do |*args| args.length.should == 1 # args.first.should =~ %r<source \/etc\/profile && export MM_ACTUAL_JOB_ID=[0-9a-f]{24} MM_ACTUAL_JOB_ANCESTOR_IDS=\\"[0-9a-f]{24}\\" MM_FULL_ACTUAL_JOB_ANCESTOR_IDS=\\"[0-9a-f]{24}\\" MM_ACTUAL_JOB_NAME_PATH=\\"/rjn0001/j11\\" MM_ACTUAL_JOB_SECURITY_TOKEN= MM_SCHEDULE_ID=[0-9a-f]{24} MM_SCHEDULE_ESTIMATED_TIME= MM_TEMPLATE_JOB_ID=[0-9a-f]{24} MM_TEMPLATE_JOB_ANCESTOR_IDS=\\"[0-9a-f]{24}\\" && tengine_job_agent_run -- \$HOME/j11\.sh> args.first.should =~ %r<source \/etc\/profile> args.first.should =~ %r<MM_ACTUAL_JOB_ID=[0-9a-f]{24} MM_ACTUAL_JOB_ANCESTOR_IDS=\"[0-9a-f]{24}\" MM_FULL_ACTUAL_JOB_ANCESTOR_IDS=\"[0-9a-f]{24}\" MM_ACTUAL_JOB_NAME_PATH=\"/rjn0001/j11\" MM_ACTUAL_JOB_SECURITY_TOKEN= MM_SCHEDULE_ID=[0-9a-f]{24} MM_SCHEDULE_ESTIMATED_TIME= MM_TEMPLATE_JOB_ID=[0-9a-f]{24} MM_TEMPLATE_JOB_ANCESTOR_IDS=\"[0-9a-f]{24}\"> @template.dsl_version.should == dsl_version template_job = @template.element("/rjn0001/j11") args.first.should =~ %r<MM_TEMPLATE_JOB_ID=#{template_job.id.to_s}> args.first.should =~ %r<MM_TEMPLATE_JOB_ANCESTOR_IDS=\"#{@template.id.to_s}\"> args.first.should =~ %r<job_test j11> end tengine.receive("start.job.job.tengine", :properties => { :execution_id => @execution.id.to_s, :root_jobnet_id => @root.id.to_s, :root_jobnet_name_path => @root.name_path, :target_jobnet_id => @root.id.to_s, :target_jobnet_name_path => @root.name_path, :target_job_id => @root.element('j11').id.to_s, :target_job_name_path => @root.element('j11').name_path, }) @root.reload @root.element('prev!j11').phase_key.should == :transmitted @root.element('next!j11').phase_key.should == :active @root.element('j11').phase_key.should == :starting end end context "バージョン1つだけ" do before do Tengine::Core::Setting.delete_all Tengine::Core::Setting.create!(:name => "dsl_version", :value => "1") Tengine::Job::Vertex.delete_all Rjn0001SimpleJobnetBuilder.new.tap do |builder| @template = builder.create_template(:dsl_version => "1") @root = @template.generate @ctx = builder.context end @execution = Tengine::Job::Execution.create!({ :root_jobnet_id => @root.id, }) end it{ @root.template.dsl_version.should == "1" } it_should_behave_like "最新のバージョンのルートジョブネットを参照する", "1" end context "バージョン2つ" do before do Tengine::Core::Setting.delete_all Tengine::Core::Setting.create!(:name => "dsl_version", :value => "2") Tengine::Job::Vertex.delete_all Rjn0001SimpleJobnetBuilder.new.tap do |builder| builder.create_template(:dsl_version => "1") @template = builder.create_template(:dsl_version => "2") @root = @template.generate @ctx = builder.context end @execution = Tengine::Job::Execution.create!({ :root_jobnet_id => @root.id, }) end it{ @root.template.dsl_version.should == "2" } it_should_behave_like "最新のバージョンのルートジョブネットを参照する", "2" end context "バージョン10個" do before do Tengine::Core::Setting.delete_all Tengine::Core::Setting.create!(:name => "dsl_version", :value => "10") Tengine::Job::Vertex.delete_all Rjn0001SimpleJobnetBuilder.new.tap do |builder| (1..9).each do |idx| builder.create_template(:dsl_version => idx.to_s) end @template = builder.create_template(:dsl_version => "10") @root = @template.generate @ctx = builder.context end @execution = Tengine::Job::Execution.create!({ :root_jobnet_id => @root.id, }) end it{ @root.template.dsl_version.should == "10" } it_should_behave_like "最新のバージョンのルートジョブネットを参照する", "10" end end context "https://www.pivotaltracker.com/story/show/22624209" do it "stuckにする" do Tengine::Core::Schedule.delete_all Tengine::Job::Vertex.delete_all builder = Rjn0001SimpleJobnetBuilder.new @root = builder.create_actual @ctx = builder.context @execution = Tengine::Job::Execution.create!({ :root_jobnet_id => @root.id, }) @root.phase_key = :initialized @root.save! EM.run_block do tengine.receive("expired.job.heartbeat.tengine", :properties => { :execution_id => @execution.id.to_s, :root_jobnet_id => @root.id.to_s, :target_job_id => @root.children[1].id.to_s, }) end @root.reload @root.children[1].phase_key.should == :stuck @root.phase_key.should_not == :stuck # initialized end end context "start.job.job.tengine.failed.tengined" do it "stuckにする" do Tengine::Core::Schedule.delete_all Tengine::Job::Vertex.delete_all builder = Rjn0001SimpleJobnetBuilder.new @root = builder.create_actual @ctx = builder.context @execution = Tengine::Job::Execution.create!({ :root_jobnet_id => @root.id, }) @root.phase_key = :initialized @root.save! EM.run_block do tengine.receive("start.job.job.tengine.failed.tengined", :properties => { :original_event => { :event_type_name => "start.job.job.tengine", :properties => { :execution_id => @execution.id.to_s, :root_jobnet_id => @root.id.to_s, :root_jobnet_name_path => @root.name_path, :target_jobnet_id => @root.id.to_s, :target_jobnet_name_path => @root.name_path, :target_job_id => @root.children[1].id.to_s, }}}) end @root.reload @root.children[1].phase_key.should == :stuck @root.phase_key.should_not == :stuck # initialized end it "broken event" do Tengine::Core::Schedule.delete_all Tengine::Job::Vertex.delete_all builder = Rjn0001SimpleJobnetBuilder.new @root = builder.create_actual @ctx = builder.context @execution = Tengine::Job::Execution.create!({ :root_jobnet_id => @root.id, }) @root.phase_key = :initialized @root.save! EM.run_block do tengine.receive("start.job.job.tengine.failed.tengined", :properties => { :original_event => { :event_type_name => "start.job.job.tengine", :properties => { :execution_id => @execution.id.to_s, :root_jobnet_id => @root.id.to_s, :root_jobnet_name_path => @root.name_path, :target_jobnet_id => @root.id.to_s, :target_jobnet_name_path => @root.name_path, }}}) end @root.reload @root.children[1].phase_key.should == :initialized @root.phase_key.should_not == :stuck # initialized end end %w[ stop.job.job.tengine.failed.tengined finished.process.job.tengine.failed.tengined expired.job.heartbeat.tengine.failed.tengined restart.job.job.tengine.failed.tengined ].each do |i| describe i do it "stuckにする" do Tengine::Core::Schedule.delete_all Tengine::Job::Vertex.delete_all builder = Rjn0001SimpleJobnetBuilder.new @root = builder.create_actual @ctx = builder.context @execution = Tengine::Job::Execution.create!({ :root_jobnet_id => @root.id, }) @root.phase_key = :running @root.save! EM.run_block do tengine.receive(i, :properties => { :original_event => { :event_type_name => "start.job.job.tengine", :properties => { :execution_id => @execution.id.to_s, :root_jobnet_id => @root.id.to_s, :root_jobnet_name_path => @root.name_path, :target_jobnet_id => @root.id.to_s, :target_jobnet_name_path => @root.name_path, :target_job_id => @root.children[1].id.to_s, }}}) end @root.reload @root.children[1].phase_key.should == :stuck @root.phase_key.should_not == :stuck # running end end end end