class Step

  attr_reader :stream

  STREAM_CACHE = {}
  STREAM_CACHE_MUTEX = Mutex.new
  def self.dup_stream(stream)
    case stream
    when IO, File
      return stream if stream.closed?
      STREAM_CACHE_MUTEX.synchronize do
        if STREAM_CACHE[stream].nil?
          STREAM_CACHE[stream] = stream
        else
          Misc.dup_stream(STREAM_CACHE[stream])
        end
      end
    when TSV::Dumper, TSV::Parser
      stream = stream.stream
      return stream if stream.closed?

      STREAM_CACHE_MUTEX.synchronize do
        if STREAM_CACHE[stream].nil?
          STREAM_CACHE[stream] = stream
        else
          Misc.dup_stream(STREAM_CACHE[stream])
        end
      end
    else
      stream
    end
  end

  def self.purge_stream_cache
    return
    STREAM_CACHE_MUTEX.synchronize do
      STREAM_CACHE.collect{|k,s| 
        Thread.new do
          Misc.consume_stream s
        end
      }
      STREAM_CACHE.clear
    end
  end

  def get_stream
    @mutex.synchronize do
      @stream = begin
        IO === @result ? @result : nil
      ensure
        @result = nil
      end
    end
  end

  def dup_inputs
    @inputs.collect do |input|
      Step.dup_stream input
    end
  end

  def _exec
    @exec = true if @exec.nil?
    @task.exec_in((bindings ? bindings : self), *dup_inputs)
  end

  def exec(no_load=false)
    dependencies.each{|dependency| dependency.exec(no_load) }
    @result = self._exec
    @result = @result.stream if TSV::Dumper === @result
    no_load ? @result : prepare_result(@result, @task.result_description)
  end

  def checks
    rec_dependencies.collect{|dependency| dependency.path }.uniq
  end

  def kill_children
    children_pids = info[:children_pids]
    if children_pids and children_pids.any?
      Log.medium("Killing children: #{ children_pids * ", " }")
      children_pids.each do |pid|
        Log.medium("Killing child #{ pid }")
        begin
          Process.kill "INT", pid
        rescue Exception
          Log.medium("Exception killing child #{ pid }: #{$!.message}")
        end
      end
    end
  end

  def run_dependencies(seen = [])
    seen << self.path
    dependencies.uniq.each{|dependency| 
      next if seen.include? dependency.path
      Log.info "#{Log.color :magenta, "Checking dependency"} #{Log.color :yellow, task.name.to_s || ""} => #{Log.color :yellow, dependency.task_name.to_s || ""} -- #{Log.color :blue, dependency.path}"
      begin
        dependency.relay_log self
        dependency.clean if not dependency.done? and (dependency.error? or dependency.aborted?)
        dependency.clean if dependency.streaming? and not dependency.running?
        dependency.run(ENV["RBBT_NO_STREAM"] != 'true') unless dependency.result or dependency.done?
        seen << dependency.path
        seen.concat dependency.rec_dependencies.collect{|d| d.path} 
      rescue Exception
        backtrace = $!.backtrace
        set_info :backtrace, backtrace 
        log(:error, "Exception processing dependency #{Log.color :yellow, dependency.task.name.to_s} -- #{$!.class}: #{$!.message}")
        raise $!
      end
    }
  end

  def run(no_load = false)

    result = nil
    begin
      @mutex.synchronize do
        no_load = no_load ? :stream : false
        result = Persist.persist "Job", @task.result_type, :file => path, :check => checks, :no_load => no_load do |lockfile|
          if Step === Step.log_relay_step and not self == Step.log_relay_step
            relay_log(Step.log_relay_step) unless self.respond_to? :relay_step and self.relay_step
          end
          @exec = false

          Open.rm info_file if Open.exists? info_file

          set_info :pid, Process.pid
          set_info :issued, Time.now

          log(:preparing, "Preparing job: #{Misc.fingerprint dependencies}")
          set_info :dependencies, dependencies.collect{|dep| [dep.task_name, dep.name]}

          run_dependencies

          set_info :inputs, Misc.remove_long_items(Misc.zip2hash(task.inputs, @inputs)) unless task.inputs.nil?

          set_info :started, (start_time = Time.now)
          log :started, "#{Log.color :green, "Starting task"} #{Log.color :yellow, task.name.to_s || ""} [#{Process.pid}]"

          begin
            result = _exec
          rescue Aborted
            log(:error, "Aborted")

            kill_children
            raise $!
          rescue Exception
            backtrace = $!.backtrace

            # HACK: This fixes an strange behaviour in 1.9.3 where some
            # backtrace strings are coded in ASCII-8BIT
            kill_children
            set_info :backtrace, backtrace 
            log(:error, "#{$!.class}: #{$!.message}")
            backtrace.each{|l| l.force_encoding("UTF-8")} if String.instance_methods.include? :force_encoding
            raise $!
          end

          if not no_load or ENV["RBBT_NO_STREAM"] == "true" 
            result = prepare_result result, @task.description, info if IO === result 
            result = prepare_result result.stream, @task.description, info if TSV::Dumper === result 
          end

          case result
          when IO

            log :streaming, "#{Log.color :magenta, "Streaming task result IO"} #{Log.color :yellow, task.name.to_s || ""} [#{Process.pid}]"
            ConcurrentStream.setup result do
              begin
                set_info :done, (done_time = Time.now)
                set_info :time_elapsed, (time_elapsed = done_time - start_time)
                log :done, "#{Log.color :red, "Completed task"} #{Log.color :yellow, task.name.to_s || ""} [#{Process.pid}] +#{time_elapsed.to_i} -- #{path}"
              rescue
                Log.exception $!
              ensure
                join
              end
            end
            result.abort_callback = Proc.new do
              begin
                log :error, "#{Log.color :red, "ERROR -- streamming aborted"} #{Log.color :yellow, task.name.to_s || ""} [#{Process.pid}] -- #{path}" if status == :streaming
                stop_dependencies
                abort_stream
              rescue
                Log.exception $!
              ensure
                join
              end
            end
          when TSV::Dumper
            log :streaming, "#{Log.color :magenta, "Streaming task result TSV::Dumper"} #{Log.color :yellow, task.name.to_s || ""} [#{Process.pid}]"
            ConcurrentStream.setup result.stream do
              begin
                set_info :done, (done_time = Time.now)
                set_info :done, (done_time = Time.now)
                set_info :time_elapsed, (time_elapsed = done_time - start_time)
                log :done, "#{Log.color :red, "Completed task"} #{Log.color :yellow, task.name.to_s || ""} [#{Process.pid}] +#{time_elapsed.to_i} -- #{path}"
              rescue
                Log.exception $!
              ensure
                join
              end
            end
            result.stream.abort_callback = Proc.new do
              begin
                log :error, "#{Log.color :red, "ERROR -- streamming aborted"} #{Log.color :yellow, task.name.to_s || ""} [#{Process.pid}] -- #{path}"  if status == :streaming
                stop_dependencies
                abort_stream
              rescue
                Log.exception $!
              ensure
                join
              end
            end
          else
            set_info :done, (done_time = Time.now)
            set_info :time_elapsed, (time_elapsed = done_time - start_time)
            log :done, "#{Log.color :red, "Completed task"} #{Log.color :yellow, task.name.to_s || ""} [#{Process.pid}] +#{time_elapsed.to_i}"
          end

          result
        end

        if no_load
          @result ||= result
          self
        else
          @result = prepare_result result, @task.result_description
        end
      end
    ensure
      join unless no_load
    end
  end

  def fork(semaphore = nil)
    raise "Can not fork: Step is waiting for proces #{@pid} to finish" if not @pid.nil? and not Process.pid == @pid and Misc.pid_exists?(@pid) and not done? and info[:forked]
    @pid = Process.fork do
      begin
        RbbtSemaphore.wait_semaphore(semaphore) if semaphore
        FileUtils.mkdir_p File.dirname(path) unless Open.exists? File.dirname(path)
        begin
          res = run true
        rescue Aborted
          Log.debug{"Forked process aborted: #{path}"}
          log :aborted, "Aborted"
          raise $!
        rescue Exception
          Log.debug("Exception '#{$!.message}' caught on forked process: #{path}")
          raise $!
        ensure
          join_stream
        end

        begin
          children_pids = info[:children_pids]
          if children_pids
            children_pids.each do |pid|
              if Misc.pid_exists? pid
                begin
                  Process.waitpid pid
                rescue Errno::ECHILD
                  Log.low "Waiting on #{ pid } failed: #{$!.message}"
                end
              end
            end
            set_info :children_done, Time.now
          end
        rescue Exception
          Log.debug("Exception waiting for children: #{$!.message}")
          exit -1
        end
        set_info :pid, nil
        exit 0
      ensure
        RbbtSemaphore.post_semaphore(semaphore) if semaphore
      end
    end
    set_info :forked, true
    Process.detach(@pid)
    self
  end

  def stop_dependencies
    dependencies.each do |dep|
      dep.abort unless dep.done?
    end
  end

  def abort_pid
    @pid ||= info[:pid]

    case @pid
    when nil
      Log.medium "Could not abort #{path}: no pid"
      false
    when Process.pid
      Log.medium "Could not abort #{path}: same process"
      false
    else
      Log.medium "Aborting #{path}: #{ @pid }"
      begin
        Process.kill("KILL", @pid)
        Process.waitpid @pid
      rescue Exception
        Log.debug("Aborted job #{@pid} was not killed: #{$!.message}")
      end
      true
    end
  end

  def abort_stream
    stream = get_stream if @result
    stream ||= @stream 
    if stream
      stream.abort if stream.respond_to? :abort
    end
  end

  def abort
    begin
      abort_pid
      stop_dependencies
      abort_stream
    ensure
      log(:aborted, "Job aborted")
    end
  end

  def join_stream
    stream = get_stream if @result
    if stream
      begin
        Misc.consume_stream stream
        stream.join if stream.respond_to? :join # and not stream.joined?
      rescue Exception
        stream.abort if stream.respond_to? :abort 
        self.abort
        raise $!
      end
    end
  end

  def join

    join_stream

    return self if not Open.exists? info_file

    return self if info[:joined]
    pid = @pid 

    Misc.insist [0.1, 0.2, 0.5, 1] do
      pid ||= info[:pid]
    end

    begin
      if pid.nil?
        dependencies.each{|dep| dep.join }
        self
      else
        begin
          Log.debug{"Waiting for pid: #{pid}"}
          Process.waitpid pid 
        rescue Errno::ECHILD
          Log.debug{"Process #{ pid } already finished: #{ path }"}
        end if Misc.pid_exists? pid
        pid = nil
        dependencies.each{|dep| dep.join }
        self
      end
    ensure
      set_info :joined, true
    end
    self
  end
end