require 'rbbt/persist'
require 'rbbt/persist/tsv'
require 'rbbt/util/log'
require 'rbbt/util/semaphore'
require 'rbbt/workflow/accessor'


class Step
  attr_accessor :path, :task, :inputs, :dependencies, :bindings
  attr_accessor :pid
  attr_accessor :exec
  attr_accessor :result, :mutex

  def initialize(path, task = nil, inputs = nil, dependencies = nil, bindings = nil)
    path = Path.setup(Misc.sanitize_filename(path)) if String === path
    pat = path.call if Proc === path
    @path = path
    @task = task
    @bindings = bindings
    @dependencies = case
                    when dependencies.nil? 
                      []
                    when Array === dependencies
                      dependencies
                    else
                      [dependencies]
                    end
    @mutex = Mutex.new
    @info_mutex = Mutex.new
    @inputs = inputs || []
  end

  def task_name
    @task.name
  end

  def path
    @path = Misc.sanitize_filename(Path.setup(@path.call)) if Proc === @path
    @path
  end

  class << self
    attr_accessor :log_relay_step
  end

  def relay_log(step)
    return self unless Task === self.task and not self.task.name.nil?
    if not self.respond_to? :original_log
      class << self
        attr_accessor :relay_step
        alias original_log log 
        def log(status, message = nil)
          self.status = status
          message message
          relay_step.log([task.name.to_s, status.to_s] * ">", message.nil? ? nil : message )
        end
      end
    end
    @relay_step = step
    self
  end

  def prepare_result(value, description = nil, info = {})
    #info = self.info
    case 
    when IO === value
      begin
        case @task.result_type
        when :array
          array = []
          while line = value.gets
            array << line
          end
          array
        when :tsv
          TSV.open(value)
        else
          value.read
        end
      rescue Exception
        value.abort if value.respond_to? :abort
      ensure
        value.close unless value.closed?
        value.join if value.respond_to? :join
      end
    when (not defined? Entity or description.nil? or not Entity.formats.include? description)
      value
    when (Annotated === value and info.empty?)
      value
    when Annotated === value
      annotations = value.annotations
      info.each do |k,v|
        value.send("#{h}=", v) if annotations.include? k
      end
      value
    else
      Entity.formats[description].setup(value, info.merge(:format => description))
    end
  end

  def get_stream
    @mutex.synchronize do
      begin
        IO === @result ? @result : nil
      ensure
        @result = nil
      end
    end
  end

  def _exec
    @exec = true if @exec.nil?
    @task.exec_in((bindings ? bindings : self), *@inputs)
  end

  def exec(no_load=false)
    dependencies.each{|dependency| dependency.exec(no_load) }
    @result = _exec
    @result = @result.stream if TSV::Dumper === @result
    no_load ? @result : prepare_result(@result, @task.result_description)
  end

  def join
    stream = get_stream
    begin
      Misc.consume_stream stream if stream
    rescue
      stream.abort if stream.respond_to? :abort
      raise $!
    ensure
      stream.join if stream.respond_to? :join and not stream.joined?
    end


    if @pid.nil?
      dependencies.each{|dep| dep.join }
      self
    else
      begin
        Log.debug{"Waiting for pid: #{@pid}"}
        Process.waitpid @pid 
      rescue Errno::ECHILD
        Log.debug{"Process #{ @pid } already finished: #{ path }"}
      end if Misc.pid_exists? @pid
      @pid = nil
      dependencies.each{|dep| dep.join }
      self
    end
  end

  def checks
    rec_dependencies.collect{|dependency| dependency.path }.uniq
  end

  def run(no_load = false)

    @mutex.synchronize do
      result = Persist.persist "Job", @task.result_type, :file => path, :check => checks, :no_load => no_load ? :stream : false do
        if Step === Step.log_relay_step and not self == Step.log_relay_step
          relay_log(Step.log_relay_step) unless self.respond_to? :relay_step and self.relay_step
        end
        @exec = false

        Open.rm info_file if Open.exists? info_file

        set_info :pid, Process.pid
        set_info :issued, Time.now

        log(:preparing, "Preparing job: #{Misc.fingerprint dependencies}")
        set_info :dependencies, dependencies.collect{|dep| [dep.task_name, dep.name]}
        seen_deps = []
        dependencies.uniq.each{|dependency| 
          Log.info "#{Log.color :magenta, "Checking dependency"} #{Log.color :yellow, task.name.to_s || ""} => #{Log.color :yellow, dependency.task_name.to_s || ""}"
          begin
            next if seen_deps.include? dependency.path
            dependency.relay_log self
            dependency.clean if not dependency.done? and dependency.error?
            dependency.clean if dependency.streaming? and not dependency.running?
            dependency.run true unless dependency.result 
            seen_deps.concat dependency.rec_dependencies.collect{|d| d.path} 
          rescue Exception
            backtrace = $!.backtrace
            set_info :backtrace, backtrace 
            log(:error, "Exception processing dependency #{Log.color :yellow, dependency.task.name.to_s} -- #{$!.class}: #{$!.message}")
            raise $!
          end
        }

        set_info :inputs, Misc.remove_long_items(Misc.zip2hash(task.inputs, @inputs)) unless task.inputs.nil?

        set_info :started, (start_time = Time.now)
        log :started, "#{Log.color :green, "Starting task"} #{Log.color :yellow, task.name.to_s || ""} [#{Process.pid}]"

        begin
          result = _exec
        rescue Aborted
          log(:error, "Aborted")

          children_pids = info[:children_pids]
          if children_pids and children_pids.any?
            Log.medium("Killing children: #{ children_pids * ", " }")
            children_pids.each do |pid|
              Log.medium("Killing child #{ pid }")
              begin
                Process.kill "INT", pid
              rescue Exception
                Log.medium("Exception killing child #{ pid }: #{$!.message}")
              end
            end
          end

          raise $!
        rescue Exception
          backtrace = $!.backtrace

          # HACK: This fixes an strange behaviour in 1.9.3 where some
          # backtrace strings are coded in ASCII-8BIT
          backtrace.each{|l| l.force_encoding("UTF-8")} if String.instance_methods.include? :force_encoding

          set_info :backtrace, backtrace 
          log(:error, "#{$!.class}: #{$!.message}")
          raise $!
        end

        case result
        when IO
          log :streaming, "#{Log.color :magenta, "Streaming task result IO"} #{Log.color :yellow, task.name.to_s || ""} [#{Process.pid}]"
          ConcurrentStream.setup result do
            begin
              set_info :done, (done_time = Time.now)
              set_info :time_elapsed, (time_elapsed = done_time - start_time)
              log :done, "#{Log.color :red, "Completed task"} #{Log.color :yellow, task.name.to_s || ""} [#{Process.pid}] +#{time_elapsed.to_i} -- #{path}"
            rescue
              Log.exception $!
            end
          end
        when TSV::Dumper
          log :streaming, "#{Log.color :magenta, "Streaming task result TSV::Dumper"} #{Log.color :yellow, task.name.to_s || ""} [#{Process.pid}]"
          ConcurrentStream.setup result.stream do
            begin
              set_info :done, (done_time = Time.now)
              set_info :done, (done_time = Time.now)
              set_info :time_elapsed, (time_elapsed = done_time - start_time)
              log :done, "#{Log.color :red, "Completed task"} #{Log.color :yellow, task.name.to_s || ""} [#{Process.pid}] +#{time_elapsed.to_i} -- #{path}"
            rescue
              Log.exception $!
            end
          end
        else
          set_info :done, (done_time = Time.now)
          set_info :time_elapsed, (time_elapsed = done_time - start_time)
          log :done, "#{Log.color :red, "Completed task"} #{Log.color :yellow, task.name.to_s || ""} [#{Process.pid}] +#{time_elapsed.to_i}"
        end

        result
      end

      if no_load
        @result ||= result
        self
      else
        @result = prepare_result result, @task.result_description
      end
    end
  end

  def fork(semaphore = nil)
    raise "Can not fork: Step is waiting for proces #{@pid} to finish" if not @pid.nil?
    @pid = Process.fork do
      begin
        RbbtSemaphore.wait_semaphore(semaphore) if semaphore
        FileUtils.mkdir_p File.dirname(path) unless Open.exists? File.dirname(path)
        begin
          res = run(true)
          io = get_stream
          if IO === io
            Misc.consume_stream(io)
            io.close unless io.closed?
            io.join if io.respond_to? :join and not io.joined?
          end
        rescue Aborted
          Log.debug{"Forked process aborted: #{path}"}
          log :aborted, "Aborted"
          raise $!
        rescue Exception
          Log.debug("Exception '#{$!.message}' caught on forked process: #{path}")
          raise $!
        end

        begin
          children_pids = info[:children_pids]
          if children_pids
            children_pids.each do |pid|
              if Misc.pid_exists? pid
                begin
                  Process.waitpid pid
                rescue Errno::ECHILD
                  Log.low "Waiting on #{ pid } failed: #{$!.message}"
                end
              end
            end
            set_info :children_done, Time.now
          end
        rescue Exception
          Log.debug("Exception waiting for children: #{$!.message}")
          exit -1
        end
        set_info :pid, nil
        exit 0
      ensure
        RbbtSemaphore.post_semaphore(semaphore) if semaphore
      end
    end
    set_info :forked, true
    Process.detach(@pid)
    self
  end

  def abort
    @pid ||= info[:pid]

    return true unless info[:forked]

    case @pid
    when nil
      Log.medium "Could not abort #{path}: no pid"
      false
    when Process.pid
      Log.medium "Could not abort #{path}: same process"
      false
    else
      Log.medium "Aborting #{path}: #{ @pid }"
      begin
        Process.kill("KILL", @pid)
        Process.waitpid @pid
      rescue Exception
        Log.debug("Aborted job #{@pid} was not killed: #{$!.message}")
      end
      log(:aborted, "Job aborted by user")
      true
    end
    log(:aborted, "Job aborted by user")
  end

  def child(&block)
    child_pid = Process.fork &block
    children_pids = info[:children_pids]
    if children_pids.nil?
      children_pids = [child_pid]
    else
      children_pids << child_pid
    end
    #Process.detach(child_pid)
    set_info :children_pids, children_pids
    child_pid
  end

  def load
    return prepare_result @result, @task.result_description if @result and not @path == @result
    join if not done?
    return Persist.load_file(@path, @task.result_type) if @path.exists?
    exec
  end

  def clean
    if Open.exists?(path) or Open.exists?(info_file)
      begin
        self.abort if self.running?
      rescue Exception
      end

      @result = nil
      @pid = nil

      begin
        Open.rm info_file if Open.exists? info_file
        Open.rm info_file + '.lock' if Open.exists? info_file + '.lock'
        Open.rm path if Open.exists? path
        Open.rm path + '.lock' if Open.exists? path + '.lock'
        Open.rm_rf files_dir if Open.exists? files_dir
      end
    end
    self
  end

  def rec_dependencies

    # A step result with no info_file means that it was manually
    # placed. In that case, do not consider its dependencies
    return [] if Open.exists?(self.path.to_s) and not Open.exists? self.info_file

    return [] if dependencies.nil? or dependencies.empty?
    new_dependencies = dependencies.collect{|step| 
      step.rec_dependencies 
    }.flatten.uniq.compact

    dependencies = self.dependencies ? self.dependencies + new_dependencies : new_dependencies
    dependencies.flatten!
    dependencies.uniq!
    dependencies
  end

  def recursive_clean
    rec_dependencies.each do |step| 
      if File.exists?(step.info_file) 
        step.clean 
      end
    end
    clean
  end

  def step(name)
    @steps ||= {}
    @steps[name] ||= rec_dependencies.select do |step| 
      step.task_name.to_sym == name.to_sym
    end.first
  end
end