# -*- coding: utf-8 -*-
require 'tengine/job/runtime'

require 'tengine/resource/net_ssh'

# ルートジョブネットを他のジョブネット内に展開するための特殊なテンプレート用Vertex。
class Tengine::Job::Runtime::SshJob < Tengine::Job::Runtime::JobBase

  class Error < StandardError
  end

  include Tengine::Core::CollectionAccessible
  include Tengine::Job::Template::SshJob::Settings

  include Tengine::Job::Runtime::StateTransition

  field :executing_pid, :type => String # 実行しているプロセスのPID
  field :exit_status  , :type => String # 終了したプロセスが返した終了ステータス
  field :error_messages, :type => Array # エラーになった場合のメッセージを保持する配列。再実行時に追加される場合は末尾に追加されます。
  array_text_accessor :error_messages, :delimeter => "\n"

  before_validation :prepare_server_and_credential

  def prepare_server_and_credential
    if t = template_vertex
      self.server_name = t.actual_server_name if server_name.blank?
      self.credential_name = t.actual_credential_name if credential_name.blank?
    end
  end

  def run(execution)
    return ack(@acked_pid) if @acked_pid
    cmd = build_command(execution)
    # puts "cmd:\n" << cmd
    execute(cmd) do |ch, data|
      pids = data.strip.scan(/^\d+$/)
      case pids.length
      when 0 then
        add_error_message("expected a set of numeric charactors but not found in: " << data.inspect)
        raise Error, "Failure to execute #{self.name_path} via SSH. expected numeric charactors but got: #{data}"
      when 1 then
        pid = pids.first.strip
        Tengine.logger.info("got pid: #{pid.inspect}")
      else
        add_error_message("expected a set of numeric charactors but got #{pids.length} sets of numeric charactoers #{pids.inspect} in #{data.inspect}")
        raise Error, "Failure to execute #{self.name_path} via SSH. expected numeric charactors but got: #{data}"
      end

      if signal = execution.signal
        signal.call_later do
          signal.data = {:executing_pid => pid}

          # このブロック内の処理はupdate_with_lockによって複数回実行されることがあります。
          # 1回目と同じリロードされていないオブジェクトを2回目以降に使用すると、1回目の変更が残っているので
          # そのオブジェクトに対して処理を行うのはNGです。
          # self.ack(signal) # これはNG

          # このブロックが実行されるたびに、rootからselfと同じidのオブジェクトを新たに取得する必要があります。
          job = root.vertex(self.id)
          job.ack(signal)
        end
      end
    end
  rescue Exception => e
    Tengine.logger.error("[#{e.class}] #{e.message}\n  " << e.backtrace.join("\n  "))
    raise
  end

  class ShellClient
    def initialize(channel, script, callback)
      @channel, @script, @callback = channel, script, callback
      @status = :preparing # :preparing, :waiting, :exiting
    end

    def setup
      @data = ""
      @result = nil

      @channel.on_data do |ch, data|
        # puts "on_data: #{data.inspect}"
        @data << data
        Tengine.logger.info("got STDOUT data: #{data.inspect}")
      end

      @channel.on_process do |ch|
        while @data =~ %r!^.*?\n!
          @data = $'
          dispatch($&)
        end
      end
    end

    def dispatch(line)
      # puts "line: #{line.inspect}"
      case @status
      when :preparing then execute
      when :waiting then
        if line.strip == one_time_token
          returns
        else
          @result << line
        end
      when :exiting then
        # do nothing...
      else
        raise Error, "Unknown shell channel status: #{@status.inspect}"
      end
    end

    def start
      prepare # execute, returnsはdispatchから呼ばれます
    end

    def prepare
      cmd = "export PS1=;"
      Tengine.logger.info("now exec on ssh: \"#{cmd}\"")
      @channel.send_data("#{cmd}\n")
    end

    def execute
      actual = @script.force_encoding("binary")
      Tengine.logger.info("now exec on ssh: " << @script)
      # puts("now exec on ssh: " << @script)
      @result = ""
      @status = :waiting
      @channel.send_data(actual + "; echo \"#{one_time_token}\"\n")
    end

    def returns
      @callback.call(@channel, @result) if @callback
      @status = :exiting
      @channel.send_data("exit\n")
    end

    def one_time_token
      "one_time_token"
    end
  end

  def execute(cmd, &block)
    raise "actual_server not found for #{self.name_path.inspect}" unless actual_server
    Tengine.logger.info("connecting to #{actual_server.hostname_or_ipv4}")
    port = actual_server.properties["ssh_port"] || 22
    keys_only = actual_credential.auth_type_cd == :ssh_public_key
    Net::SSH.start(actual_server.hostname_or_ipv4, actual_credential, :port => port, :logger => Tengine.logger, :keys_only => keys_only) do |ssh|
      # see http://net-ssh.github.com/ssh/v2/api/classes/Net/SSH/Connection/Channel.html
      c = ssh.open_channel do |ch0|
        ch0.request_pty do |channel, success|
          raise Error, "failed to request_pty" unless success

          channel.exec("#{ENV['SHELL']} -l") do |shell_ch, success|
            raise Error, "failed to \"#{ENV['SHELL']} -l\"" unless success

            shell_ch.on_extended_data do |ch, type, data|
              add_error_message(data)
              raise Error, "Failure to execute #{self.name_path} via SSH: #{data}"
            end

            client = ShellClient.new(shell_ch, cmd, block)
            shell_ch[:client] = client
            client.setup
            client.start
          end
        end
      end
      c.wait
    end
  rescue Tengine::Job::Runtime::SshJob::Error
    raise
  rescue Mongoid::Errors::DocumentNotFound, SocketError, Net::SSH::AuthenticationFailed => src
    error = Error.new("[#{src.class.name}] #{src.message}")
    error.set_backtrace(src.backtrace)
    raise error
  rescue Exception
    # puts "[#{$!.class.name}] #{$!.message}"
    raise
  end

  def kill(execution)
    lines = []

    if self.executing_pid.blank?
      Tengine.logger.warn("PID is blank when kill!!\n#{self.inspect}\n  " << caller.join("\n  "))
    end

    cmd = executable_command("tengine_job_agent_kill %s %d %s" % [
        self.executing_pid,
        self.actual_killing_signal_interval,
        self.actual_killing_signals.join(","),
      ])
    lines << cmd
    cmd = lines.join(' && ')
    execute(cmd)
  end

#   def ack(pid)
#     @acked_pid = pid
#     self.executing_pid = pid
#     self.phase_key = :running
#     self.previous_edges.each{|edge| edge.status_key = :transmitted}
#   end

  def build_command(execution)
    result = []
    mm_env = build_mm_env(execution).map{|k,v| "#{k}=#{v}"}.join(" ")
    # Hadoopジョブの場合は環境変数をセットする
    if is_a?(Tengine::Job::Runtime::Jobnet) && (jobnet_type_key == :hadoop_job_run)
      mm_env << ' ' << hadoop_job_env
    end
    result << "export #{mm_env}"
    template_root = (parent ? root_or_expansion.template : nil)
    if template_root
      template_job = template_root.vertex_by_name_path(self.name_path_until_expansion)
      unless template_job
        raise "job not found #{self.name_path_until_expansion.inspect} in #{template_root.inspect}"
      end
      key = Tengine::Job::Dsl::Loader.template_block_store_key(template_job, :preparation)
      preparation_block = Tengine::Job::Dsl::Loader.template_block_store[key]
      if preparation_block
        preparation = instance_eval(&preparation_block)
        unless preparation.blank?
          result << preparation
        end
      end
    end
    unless execution.preparation_command.blank?
      result << execution.preparation_command
    end
    # cmdはユーザーが設定したスクリプトを組み立てたもので、
    # プロセスの監視/強制停止のためにtengine_job_agent/bin/tengine_job_agent_run
    # からこれらを実行させるためにはcmdを編集します。
    # tengine_job_agent_runは、標準出力に監視対象となる起動したプロセスのPIDを出力します。
    runner_path = ENV["MM_RUNNER_PATH"] || executable_command("tengine_job_agent_run")
    runner_option = ""
    # 実装するべきか要検討
    # runner_option << " --stdout" if execution.keeping_stdout
    # runner_option << " --stderr" if execution.keeping_stderr
    # script = "#{runner_path}#{runner_option} -- #{self.script}" # runnerのオプションを指定する際は -- の前に設定してください
    script = "#{runner_path}#{runner_option} #{self.script}" # runnerのオプションを指定する際は -- の前に設定してください
    result << script
    result.join(" && ")
  end

  def executable_command(command)
    if prefix = ENV["MM_CMD_PREFIX"]
      "#{prefix} #{command}"
    else
      command
    end
  end

  # MMから実行されるシェルスクリプトに渡す環境変数のHashを返します。
  # MM_ACTUAL_JOB_ID                : 実行される末端のジョブのMM上でのID
  # MM_ACTUAL_JOB_ANCESTOR_IDS      : 実行される末端のジョブの祖先のMM上でのIDをセミコロンで繋げた文字列 (テンプレートジョブ単位)
  # MM_FULL_ACTUAL_JOB_ANCESTOR_IDS : 実行される末端のジョブの祖先のMM上でのIDをセミコロンで繋げた文字列 (expansionから展開した単位)
  # MM_ACTUAL_JOB_NAME_PATH         : 実行される末端のジョブのname_path
  # MM_ACTUAL_JOB_SECURITY_TOKEN    : 公開API呼び出しのためのセキュリティ用のワンタイムトークン
  # MM_TEMPLATE_JOB_ID              : テンプレートジョブ(=実行される末端のジョブの元となったジョブ)のID
  # MM_TEMPLATE_JOB_ANCESTOR_IDS    : テンプレートジョブの祖先のMM上でのIDをセミコロンで繋げたもの
  # MM_SCHEDULE_ID                  : 実行スケジュールのID
  # MM_SCHEDULE_ESTIMATED_TIME      : 実行スケジュールの見積り時間。単位は分。
  # MM_SCHEDULE_ESTIMATED_END       : 実行スケジュールの見積り終了時刻をYYYYMMDDHHMMSS式で。(できればISO 8601など、タイムゾーンも表現できる標準的な形式の方が良い?)
  # MM_MASTER_SCHEDULE_ID           : マスタースケジュールがあればそのID。マスタースケジュールがない場合は環境変数は指定されません。
  #
  # 未実装
  # MM_FAILED_JOB_ID                : ジョブが失敗した場合にrecoverやfinally内のジョブを実行時に設定される、失敗したジョブのMM上でのID。
  # MM_FAILED_JOB_ANCESTOR_IDS      : ジョブが失敗した場合にrecoverやfinally内のジョブを実行時に設定される、失敗したジョブの祖先のMM上でのIDをセミコロンで繋げた文字列。
  def build_mm_env(execution)
    result = {
      "MM_SERVER_NAME" => actual_server_name,  # [Tengineの仕様として追加] ジョブの実行サーバ名を設定
      "MM_ROOT_JOBNET_ID" => root.id.to_s,
      "MM_TARGET_JOBNET_ID" => (parent ? parent.id.to_s : nil),
      "MM_ACTUAL_JOB_ID" => id.to_s,
      "MM_ACTUAL_JOB_ANCESTOR_IDS" => '"%s"' % ancestors_until_expansion.map(&:id).map(&:to_s).join(';'),
      "MM_FULL_ACTUAL_JOB_ANCESTOR_IDS" => '"%s"' % ancestors.map(&:id).map(&:to_s).join(';'),
      "MM_ACTUAL_JOB_NAME_PATH" => name_path.dump,
      "MM_ACTUAL_JOB_SECURITY_TOKEN" => "", # TODO トークンの生成
      "MM_SCHEDULE_ID" => execution.id.to_s,
      "MM_SCHEDULE_ESTIMATED_TIME" => execution.estimated_time,
    }
    if estimated_end = execution.actual_estimated_end
      result["MM_SCHEDULE_ESTIMATED_END"] = estimated_end.strftime("%Y%m%d%H%M%S")
    end
    if rjt = (parent ? root.template : nil)
      t = rjt.find_descendant_by_name_path(self.name_path)
      unless t
        template_name_parts = self.name_path_until_expansion.split(Tengine::Job::Structure::NamePath::SEPARATOR).select{|s| !s.empty?}
        root_jobnet_name = template_name_parts.first
        if rjt = Tengine::Job::Template::RootJobnet.find_by_name(root_jobnet_name, :version => rjt.dsl_version)
          t = rjt.find_descendant_by_name_path(self.name_path_until_expansion)
          raise "template job #{name_path.inspect} not found in #{rjt.inspect}" unless t
        else
          raise "Tengine::Job::Template::RootJobnet not found #{self.name_path_until_expansion.inspect}"
        end
      end
      result.update({
          "MM_TEMPLATE_JOB_ID" => t.id.to_s,
          "MM_TEMPLATE_JOB_ANCESTOR_IDS" => '"%s"' % t.ancestors.map(&:id).map(&:to_s).join(';'),
      })
    end
    # if ms = execution.master_schedule
    #   result.update({
    #       "MM_MASTER_SCHEDULE_ID" => ms.id.to_s,
    #   })
    # end
    result
  end

  def hadoop_job_env
    s = children.select{|c| c.is_a?(Tengine::Job::Runtime::Jobnet) && (c.jobnet_type_key == :hadoop_job)}.
      map{|c| "#{c.name}\\t#{c.id.to_s}\\n"}.join
    "MM_HADOOP_JOBS=\"#{s}\""
  end

  def add_error_message(msg)
    self.error_messages ||= []
    self.error_messages += [msg]
  end


  ## 状態遷移アクション

  # ハンドリングするドライバ: ジョブネット制御ドライバ
  def transmit(signal)
    self.phase_key = :ready
    self.started_at = signal.event.occurred_at
    signal.fire(self, :"start.job.job.tengine", {
        :target_jobnet_id => parent.id,
        :target_jobnet_name_path => parent.name_path,
        :target_job_id => self.id,
        :target_job_name_path => self.name_path,
      })
  end
  available(:transmit, :on => :initialized,
    :ignored => [:ready, :starting, :running, :dying, :success, :error, :stuck])

  # ハンドリングするドライバ: ジョブ制御ドライバ
  def activate(signal)
    case phase_key
    when :initialized then
      # 特別ルール「starting直前stop」
      # initializedに戻されたジョブに対して、:readyになる際にtransmitで送信されたイベントを受け取って、
      # activateしようとすると状態は遷移しないが、後続のエッジを実行する。
      # (エッジを実行しようとした際、エッジがclosedならばそのジョブネットのEndに遷移する。)
      next_edges.first.transmit(signal)
    when :ready then
      self.phase_key = :starting
      self.started_at = signal.event.occurred_at

      signal.call_later do
        complete_origin_edge(signal)
        execution = signal.execution
        if execution.retry
          if execution.target_actual_ids.include?(self.id.to_s)
            execution.ack(signal)
          elsif execution.target_actuals.map{|t| t.parent.id.to_s if t.parent }.include?(self.parent.id.to_s)
            # 自身とTengine::Job::Runtime::Execution#target_actual_idsに含まれるジョブ/ジョブネットと親が同じならば、ackしない
          else
            parent.ack(signal)
          end
        else
          parent.ack(signal) # 再実行でない場合
        end
        # このコールバックはjob_control_driverでupdate_with_lockの外側から
        # 再度呼び出してもらうためにcallbackを設定しています
        signal.call_later do
          # 実際にSSHでスクリプトを実行
          execution = signal.execution
          execution.signal = signal # ackを呼び返してもらうための苦肉の策
          begin
            run(execution)
          rescue Tengine::Job::Runtime::SshJob::Error => e
            Tengine.logger.warn("error on run\nerror: #{e.inspect}\njob: #{self.inspect}\nexecution: #{execution.inspect}")
            signal.call_later do
              self.fail(signal, :message => e.message)
            end
          end
        end
      end
    when :starting then
      raise "something wrong! #{self.inspect}"
    end
  end
  available(:activate, :on => [:initialized, :ready, :starting],
    :ignored => [:running, :dying, :success, :error, :stuck])

  # ハンドリングするドライバ: ジョブ制御ドライバ
  # スクリプトのプロセスのPIDを取得できたときに実行されます
  def ack(signal)
    self.executing_pid = (signal.data || {})[:executing_pid]
    self.phase_key = :running
  end
  available(:ack, :on => :starting,
    :ignored => [:running, :dying, :success, :error, :stuck])

  def finish(signal)
    self.exit_status = signal.event[:exit_status]
    self.finished_at = signal.event.occurred_at
    (self.exit_status.to_s == '0') ?
    succeed(signal) :
      fail(signal)
  end

  # ハンドリングするドライバ: ジョブ制御ドライバ
  def succeed(signal)
    self.phase_key = :success
    self.finished_at = signal.event.occurred_at
    signal.fire(self, :"success.job.job.tengine", {
        :exit_status => self.exit_status,
        :target_jobnet_id => parent.id,
        :target_jobnet_name_path => parent.name_path,
        :target_job_id => self.id,
        :target_job_name_path => self.name_path,
      })
  end
  available :succeed, :on => [:starting, :running, :dying, :stuck], :ignored => [:success]

  # ハンドリングするドライバ: ジョブ制御ドライバ
  def fail(signal, options = nil)
    self.phase_key = :error
    if msg = signal.event[:message]
      add_error_message(msg)
    end
    if options && (msg = options[:message])
      add_error_message(msg)
    end
    self.finished_at = signal.event.occurred_at
    event_options = {
      :exit_status => self.exit_status,
      :target_jobnet_id => parent.id,
      :target_jobnet_name_path => parent.name_path,
      :target_job_id => self.id,
      :target_job_name_path => self.name_path,
    }
    event_options.update(options) if options
    signal.fire(self, :"error.job.job.tengine", event_options)
  end
  available :fail, :on => [:starting, :running, :dying], :ignored => [:error, :stuck]

  def fire_stop(signal)
    signal.fire(self, :"stop.job.job.tengine", {
        :stop_reason => signal.event[:stop_reason],
        :target_jobnet_id => parent.id,
        :target_jobnet_name_path => parent.name_path,
        :target_job_id => self.id,
        :target_job_name_path => self.name_path,
      })
  end
  available :fire_stop, :on => [:ready, :starting, :running], :ignored => [:initialized, :dying, :success, :error, :stuck]

  def stop(signal, &block)
    case phase_key
    when :ready then
      self.phase_key = :initialized
      self.stopped_at = signal.event.occurred_at
      self.stop_reason = signal.event[:stop_reason]
      next_edges.first.transmit(signal)
    when :starting then
      job = nil
      loop do
        # root = self.root.reload # class.find(self.root.id)
        # job = root.find_descendant(self.id)
        job = self.class.find(self.id)
        break unless job.phase_key == :starting
        yield if block_given? # テストの為にyieldしています
        sleep(0.1)
      end
      job.stop(signal, &block)
    when :running then
      self.phase_key = :dying
      self.stopped_at = signal.event.occurred_at
      self.stop_reason = signal.event[:stop_reason]
      signal.call_later do
        kill(signal.execution)
      end
    end
  end
  available :stop, :on => [:ready, :starting, :running], :ignored => [:initialized, :dying, :success, :error, :stuck]

  def reset(signal, &block)
    self.phase_key = :initialized
    reset_followings(signal)
  end
  available :reset, :on => [:initialized, :ready, :success, :error, :stuck]

end