# -*- coding: utf-8 -*- require 'tengine/job' require 'tengine/resource/net_ssh' # ジョブとして実際にスクリプトを実行する処理をまとめるモジュール。 # Tengine::Job::JobnetActualと、Tengine::Job::ScriptActualがincludeします module Tengine::Job::ScriptExecutable extend ActiveSupport::Concern class Error < StandardError end included do include Tengine::Core::CollectionAccessible field :executing_pid, :type => String # 実行しているプロセスのPID field :exit_status , :type => String # 終了したプロセスが返した終了ステータス field :error_messages, :type => Array # エラーになった場合のメッセージを保持する配列。再実行時に追加される場合は末尾に追加されます。 array_text_accessor :error_messages, :delimeter => "\n" end def run(execution) return ack(@acked_pid) if @acked_pid cmd = build_command(execution) # puts "cmd:\n" << cmd execute(cmd) do |ch, data| if signal = execution.signal # signal.data = {:executing_pid => data.strip} # ack(signal) pid = data.strip signal.callback = lambda do signal.data = {:executing_pid => pid} # このブロック内の処理はupdate_with_lockによって複数回実行されることがあります。 # 1回目と同じリロードされていないオブジェクトを2回目以降に使用すると、1回目の変更が残っているので # そのオブジェクトに対して処理を行うのはNGです。 # self.ack(signal) # これはNG # このブロックが実行されるたびに、rootからselfと同じidのオブジェクトを新たに取得する必要があります。 job = root.vertex(self.id) job.ack(signal) end end end end def execute(cmd) raise "actual_server not found for #{self.name_path.inspect}" unless actual_server Tengine.logger.info("connecting to #{actual_server.hostname_or_ipv4}") port = actual_server.properties["ssh_port"] || 22 keys_only = actual_credential.auth_type_cd == :ssh_public_key Net::SSH.start(actual_server.hostname_or_ipv4, actual_credential, :port => port, :logger => Tengine.logger, :keys_only => keys_only) do |ssh| # see http://net-ssh.github.com/ssh/v2/api/classes/Net/SSH/Connection/Channel.html ssh.open_channel do |channel| Tengine.logger.info("now exec on ssh: " << cmd) channel.exec(cmd.force_encoding("binary")) do |ch, success| raise Error, "could not execute command" unless success channel.on_close do |ch| # puts "channel is closing!" end channel.on_data do |ch, data| Tengine.logger.debug("got stdout: #{data}") yield(ch, data) if block_given? end channel.on_extended_data do |ch, type, data| self.error_messages ||= [] self.error_messages += [data] raise Error, "Failure to execute #{self.name_path} via SSH: #{data}" end end end end rescue Tengine::Job::ScriptExecutable::Error raise rescue Mongoid::Errors::DocumentNotFound, SocketError, Net::SSH::AuthenticationFailed => src error = Error.new("[#{src.class.name}] #{src.message}") error.set_backtrace(src.backtrace) raise error rescue Exception # puts "[#{$!.class.name}] #{$!.message}" raise end def kill(execution) lines = source_profiles if self.executing_pid.blank? Tengine.logger.warn("PID is blank when kill!!\n#{self.inspect}\n " << caller.join("\n ")) end cmd = executable_command("tengine_job_agent_kill %s %d %s" % [ self.executing_pid, self.actual_killing_signal_interval, self.actual_killing_signals.join(","), ]) lines << cmd cmd = lines.join(' && ') execute(cmd) end # def ack(pid) # @acked_pid = pid # self.executing_pid = pid # self.phase_key = :running # self.previous_edges.each{|edge| edge.status_key = :transmitted} # end def build_command(execution) result = source_profiles mm_env = build_mm_env(execution).map{|k,v| "#{k}=#{v}"}.join(" ") # Hadoopジョブの場合は環境変数をセットする if is_a?(Tengine::Job::Jobnet) && (jobnet_type_key == :hadoop_job_run) mm_env << ' ' << hadoop_job_env end result << "export #{mm_env}" template_root = root_or_expansion.template if template_root template_job = template_root.vertex_by_name_path(self.name_path_until_expansion) unless template_job raise "job not found #{self.name_path_until_expansion.inspect} in #{template_root.inspect}" end key = Tengine::Job::DslLoader.template_block_store_key(template_job, :preparation) preparation_block = Tengine::Job::DslLoader.template_block_store[key] if preparation_block preparation = instance_eval(&preparation_block) unless preparation.blank? result << preparation end end end unless execution.preparation_command.blank? result << execution.preparation_command end # cmdはユーザーが設定したスクリプトを組み立てたもので、 # プロセスの監視/強制停止のためにtengine_job_agent/bin/tengine_job_agent_run # からこれらを実行させるためにはcmdを編集します。 # tengine_job_agent_runは、標準出力に監視対象となる起動したプロセスのPIDを出力します。 runner_path = ENV["MM_RUNNER_PATH"] || executable_command("tengine_job_agent_run") runner_option = "" # 実装するべきか要検討 # runner_option << " --stdout" if execution.keeping_stdout # runner_option << " --stderr" if execution.keeping_stderr # script = "#{runner_path}#{runner_option} -- #{self.script}" # runnerのオプションを指定する際は -- の前に設定してください script = "#{runner_path}#{runner_option} #{self.script}" # runnerのオプションを指定する際は -- の前に設定してください result << script result.join(" && ") end def source_profiles # RubyのNet::SSHでは設定ファイルが読み込まれないので、ロードするようにします。 # ~/.bash_profile, ~/.bashrc などは非対応。 # ファイルが存在していたらsourceで読み込むようにしたいのですが、一旦保留します。 # http://www.syns.net/10/ ["/etc/profile", "/etc/bashrc", "$HOME/.bashrc", "$HOME/.bash_profile"].map do |path| "if [ -f #{path} ]; then source #{path}; fi" end end def executable_command(command) if prefix = ENV["MM_CMD_PREFIX"] "#{prefix} #{command}" else command end end # MMから実行されるシェルスクリプトに渡す環境変数のHashを返します。 # MM_ACTUAL_JOB_ID : 実行される末端のジョブのMM上でのID # MM_ACTUAL_JOB_ANCESTOR_IDS : 実行される末端のジョブの祖先のMM上でのIDをセミコロンで繋げた文字列 (テンプレートジョブ単位) # MM_FULL_ACTUAL_JOB_ANCESTOR_IDS : 実行される末端のジョブの祖先のMM上でのIDをセミコロンで繋げた文字列 (expansionから展開した単位) # MM_ACTUAL_JOB_NAME_PATH : 実行される末端のジョブのname_path # MM_ACTUAL_JOB_SECURITY_TOKEN : 公開API呼び出しのためのセキュリティ用のワンタイムトークン # MM_TEMPLATE_JOB_ID : テンプレートジョブ(=実行される末端のジョブの元となったジョブ)のID # MM_TEMPLATE_JOB_ANCESTOR_IDS : テンプレートジョブの祖先のMM上でのIDをセミコロンで繋げたもの # MM_SCHEDULE_ID : 実行スケジュールのID # MM_SCHEDULE_ESTIMATED_TIME : 実行スケジュールの見積り時間。単位は分。 # MM_SCHEDULE_ESTIMATED_END : 実行スケジュールの見積り終了時刻をYYYYMMDDHHMMSS式で。(できればISO 8601など、タイムゾーンも表現できる標準的な形式の方が良い?) # MM_MASTER_SCHEDULE_ID : マスタースケジュールがあればそのID。マスタースケジュールがない場合は環境変数は指定されません。 # # 未実装 # MM_FAILED_JOB_ID : ジョブが失敗した場合にrecoverやfinally内のジョブを実行時に設定される、失敗したジョブのMM上でのID。 # MM_FAILED_JOB_ANCESTOR_IDS : ジョブが失敗した場合にrecoverやfinally内のジョブを実行時に設定される、失敗したジョブの祖先のMM上でのIDをセミコロンで繋げた文字列。 def build_mm_env(execution) result = { "MM_SERVER_NAME" => actual_server_name, # [Tengineの仕様として追加] ジョブの実行サーバ名を設定 "MM_ROOT_JOBNET_ID" => root.id.to_s, "MM_TARGET_JOBNET_ID" => parent.id.to_s, "MM_ACTUAL_JOB_ID" => id.to_s, "MM_ACTUAL_JOB_ANCESTOR_IDS" => '"%s"' % ancestors_until_expansion.map(&:id).map(&:to_s).join(';'), "MM_FULL_ACTUAL_JOB_ANCESTOR_IDS" => '"%s"' % ancestors.map(&:id).map(&:to_s).join(';'), "MM_ACTUAL_JOB_NAME_PATH" => name_path.dump, "MM_ACTUAL_JOB_SECURITY_TOKEN" => "", # TODO トークンの生成 "MM_SCHEDULE_ID" => execution.id.to_s, "MM_SCHEDULE_ESTIMATED_TIME" => execution.estimated_time, } if estimated_end = execution.actual_estimated_end result["MM_SCHEDULE_ESTIMATED_END"] = estimated_end.strftime("%Y%m%d%H%M%S") end if rjt = root.template t = rjt.find_descendant_by_name_path(self.name_path) unless t template_name_parts = self.name_path_until_expansion.split(Tengine::Job::NamePath::SEPARATOR).select{|s| !s.empty?} root_jobnet_name = template_name_parts.first if rjt = Tengine::Job::RootJobnetTemplate.find_by_name(root_jobnet_name, :version => rjt.dsl_version) t = rjt.find_descendant_by_name_path(self.name_path_until_expansion) raise "template job #{path.inspect} not found in #{rjt.inspect}" unless t else raise "Tengine::Job::RootJobnetTemplate not found #{self.name_path_until_expansion.inspect}" end end result.update({ "MM_TEMPLATE_JOB_ID" => t.id.to_s, "MM_TEMPLATE_JOB_ANCESTOR_IDS" => '"%s"' % t.ancestors.map(&:id).map(&:to_s).join(';'), }) end # if ms = execution.master_schedule # result.update({ # "MM_MASTER_SCHEDULE_ID" => ms.id.to_s, # }) # end result end def hadoop_job_env s = children.select{|c| c.is_a?(Tengine::Job::Jobnet) && (c.jobnet_type_key == :hadoop_job)}. map{|c| "#{c.name}\\t#{c.id.to_s}\\n"}.join "MM_HADOOP_JOBS=\"#{s}\"" end end