#!/usr/bin/env ruby

require 'rbbt/util/simpleopt'
require 'rbbt/workflow'
require 'rbbt/workflow/usage'
require 'time'

def report_options(options)
  if options.nil? or options.empty?
    puts "No options"
  else
    options.each do |key, value|
      puts [Log.color(:cyan, key), Misc.fingerprint(value)] * ": "
    end
  end
end

def usage(workflow = nil, task = nil, exception=nil)
  puts SOPT.doc
  puts
  if workflow.nil?
    puts "No workflow specified. Use `rbbt workflow list` to list available workflows."
    exit -1 
  end

  if task.nil?
    workflow.load_tasks if workflow.respond_to? :load_tasks
    workflow.doc
    puts
    puts "E.g. rbbt workflow task #{workflow.to_s} #{workflow.tasks.keys.first.to_s} -h"
  else
    puts Log.color :magenta, workflow.to_s
    puts Log.color :magenta, "=" * workflow.to_s.length
    if workflow.documentation[:title] and not workflow.documentation[:title].empty?
      puts
      puts Misc.format_paragraph workflow.documentation[:title] 
    end
    if workflow.documentation[:description] and not workflow.documentation[:description].empty?
      puts
      puts Misc.format_paragraph workflow.documentation[:description] 
    end
    puts
    workflow.doc(task)
  end

  print_error(exception.message, exception.backtrace) if exception

  true
end

def SOPT_options(workflow, task)
  sopt_options = []
  workflow.rec_inputs(task.name).each do |name|
    short = name.to_s.chars.first
    boolean = workflow.rec_input_types(task.name)[name].to_sym == :boolean
    
    sopt_options << "-#{short}--#{name}#{boolean ? "" : "*"}"
  end

  sopt_options * ":"
end

def get_value_stream(value)
  if value == "-"
    io = Misc.open_pipe do |sin|
      while not STDIN.eof? 
        sin.write STDIN.read(2048)
      end
      sin.close
    end
  else
    io = Open.open(value)
  end
  class << io
    attr_accessor :filename
  end
  io.filename = value
  io
end

def fix_options(workflow, task, job_options)
  input_types = IndiferentHash.setup workflow.rec_input_types(task.name)
  input_options = IndiferentHash.setup workflow.rec_input_options(task.name)

  job_options_cleaned = {}

  job_options.each do |name, value|
    value = case input_types[name].to_sym
            when :boolean
              TrueClass === value or %w(true TRUE T yes).include? value
            when :float
              value.to_f
            when :integer
              value.to_i
            when :text
              if input_options[name] and input_options[name][:stream] and String === value
                get_value_stream(value)
              else
                case
                when value == '-'
                  STDIN.read
                when (String === value and File.exist?(value) and not File.directory?(value))
                  Open.read(value)
                else
                  value
                end
              end
            when :array
              if input_options[name] and input_options[name][:stream] and String === value and Misc.is_filename?(value) and not input_options[name][:nofile]
                get_value_stream(value)
              elsif input_options[name] and input_options[name][:stream] and value == "-"
                STDIN
              else
                if Array === value
                  value
                else
                  array_separator = $array_separator
                  str = case
                        when value == '-'
                          array_separator ||= "\n"
                          STDIN.read
                        when (String === value and File.exist?(value) and not (input_options[name] && input_options[name][:nofile]))
                          array_separator ||= "\n"
                          Open.read(value)
                        else
                          value
                        end

                  if array_separator
                    str.split(/#{array_separator}/)
                  else
                    str.split(/[,|\s]/)
                  end
                end
              end
            when :tsv
              if input_options[name] and input_options[name][:stream] and String === value
                TSV::Parser.new(value == '-' ? STDIN : Open.open(value), :filename => value )
              else
                case value
                when TSV
                  value
                when '-'
                  TSV.open(STDIN, :unnamed => true, :sep => $field_separator, :sep2 => ($array_separator || "|"))
                when (Misc.is_filename?(value) and String)
                  TSV.open(value, :unnamed => true, :sep => $field_separator, :sep2 => ($array_separator || "|"))
                else
                  TSV.open(StringIO.new(value), :unnamed => true, :sep => $field_separator, :sep2 => ($array_separator || "|"))
                end
              end
            when :directory
              Path.setup(File.expand_path(value))
            else
              value
            end
    job_options_cleaned[name] = value
  end

  job_options_cleaned
end

options = SOPT.setup <<EOF
Enact workflow tasks

$ rbbt workflow task <workflow> [<task>] [<options>]

Examine workflows and enact tasks from them. If no `task` is specified, a list
of available tasks is shown. If a `task` is given it will be enacted with the
parameters specified in `options`. Use *-h* option to display the description
of a task, including the parameters it accepts; and some examples, if
available. Examples can be enacted using `rbbt workflow example <workflow>
[<task>] [<example>]`.

When a task is enacted a job is instantiated. This job is identified by the
`jobname` (which is *Default* unless specified otherwise) and the values of the
parameters. If the same taks is enacted using the same `jobname` and
parameters, then the same job will be the same. 

The first time a job is executed it will save the result. Once the job is done
you can re-doit using the `clean` parameter. The `recursive_clean` cleans all
the job dependencies recursively.

-h--help Show this help
-wd--workdir* Change the working directory of the workflow
-as--array_separator* Change the character that separates elements of Arrays, ',', '|', or '\\n' by default
-fs--field_separator* Change the character that separates fields of TSV files '\\t' by default
-jn--jobname* Job name to use. The name 'Default' is used by default
-pn--printname Print the name of the job and exit without starting it
-pf--printpath Print the path of the job result
-cl--clean Clean the last step of the job so that it gets recomputed
-ct--clean_task* Clean a particular dependency task
-rcl--recursive_clean Clean the last step and its dependencies to recompute the job completely
--fork Run job asyncronously and monitor progress. It monitors detached processes as well
--detach Run job asyncronously and detach process
--exec Run job with no persistence
-O--output* Save job result into file
-jf--job_file* Output one of the job produced files
-ljf--list_job_files List all the files produced in that step
--load_inputs* Load inputs from a directory
--info Show the job info
--provenance Report the jobs provenance
-W--workflows* Load a list of workflows
-R--requires* Require a list of files
-rwt--remote_workflow_tasks* Load a yaml file describing remote workflow tasks
-od--override_deps* Override deps using 'Workflow#task=<path>' array_separated
EOF

workflow = ARGV.shift
usage and exit -1 if workflow.nil?

task     = ARGV.shift

# Set log, fork, clean, recursive_clean and help
help = !!options.delete(:help)
do_fork = !!options.delete(:fork)
detach = !!options.delete(:detach)
do_exec = !!options.delete(:exec)
clean = !!options.delete(:clean)
clean_task = options.delete(:clean_task)
override_deps = options.delete(:override_deps)
recursive_clean = !!options.delete(:recursive_clean)
out = options.include?(:output) ? File.open(options[:output], 'wb') : STDOUT

$array_separator = options.delete(:array_separator)
$field_separator = options.delete(:field_separator) || "\t"

# Get workflow

if Rbbt.etc.remote_workflows.exists?
  remote_workflows = Rbbt.etc.remote_workflows.yaml
else
  remote_workflows = {}
end

workflow = Workflow.require_workflow workflow

if options[:workflows]
  require 'rbbt/workflow'
  workflows = options[:workflows].split(',')
  workflows.each do |workflow|
    workflow.strip!
    Workflow.require_workflow workflow
  end
end

if options[:requires]
  requires = options[:requires].split(',')
  requires.each do |req|
    req.strip!
    require req
  end
end

if options[:remote_workflow_tasks]
  Workflow.load_remote_tasks(options[:remote_workflow_tasks])
end

# Set task
namespace = nil, nil

case 
when task.nil?
  usage workflow and exit 0
else
  task_name = task.to_sym
  begin
    task = workflow.tasks[task_name]
    raise Workflow::TaskNotFoundException.new workflow, task_name if task.nil?
  rescue Workflow::TaskNotFoundException
    usage workflow 

    puts
    puts Log.color :magenta, "## Error"
    puts
    puts $!.message
    puts
    
    exit 0
  end
end

usage workflow, task and exit 0 if help

name = options.delete(:jobname) 

# get job args
sopt_option_string = SOPT_options(workflow, task)
if options[:load_inputs]
  task_info = workflow.task_info(task_name)
  job_options = Workflow.load_inputs(options[:load_inputs], task_info[:inputs], task_info[:input_types]).merge(SOPT.get(sopt_option_string))
else
  job_options = SOPT.get sopt_option_string
end
job_options = fix_options(workflow, task, job_options)
saved_job_options = job_options

workflow.workdir = Path.setup(File.expand_path(options.delete(:workdir))) if options[:workdir]

if override_deps
  override_deps.split($array_separator).each do |part|
    t_, value = part.split("=")
    job_options.merge!( t_ => value)
  end
end

#- get job

job = workflow.job(task.name, name, job_options)

# clean job
if clean 
  job.clean 
  sleep 1
end

if clean_task
  job.rec_dependencies.each do |dep|
    next unless dep.task_name.to_s == clean_task.to_s
    dep.clean 
    dep.set_info :status, :cleaned
  end
end

if recursive_clean 
  job.recursive_clean 
end

require 'pp'

# run
begin
  if options[:info]
    pp job.info
    exit 0
  end

  if do_exec or (job.respond_to?(:is_exec) and job.is_exec)
    res = job.exec(:stream)

    result_type = job.result_type

    res = JSON.parse(res.read) if (defined?(WorkflowRESTClient) and WorkflowRESTClient::RemoteStep === job) && %w(array float integer boolean).include?(result_type.to_s)

    case
    when res.respond_to?(:gets)
      begin
        Misc.consume_stream(res, false, out)
      rescue EOFError, IOError
      end
      res.join if res.respond_to? :join
    when Array === res
      out.puts res * "\n"
    when TSV === res
      out.puts res
    when Hash === res
      out.puts res.to_yaml
    when IO === res
      while block = res.read(2048)
        out.write block
      end
    else
      out.puts res
    end
    exit 0
  end

  if do_fork
    ENV["RBBT_NO_PROGRESS"] = "true"
    if detach
      job.fork
      Process.detach job.pid if job.pid
      puts Log.color(:magenta, "Issued: ") + Log.color(:magenta, job.pid ? job.pid.to_s : 'no pid') + ' -- ' + job.path
      exit 0
    end

    job.fork
  else
    job.run(:stream)
    res = job
  end


  if options.delete(:provenance)
    job.join
    pp job.provenance_paths
    exit 0
  end

  if options.delete(:printname)
    job.join if IO === job.result
    puts job.name
    exit 0
  end

  if options.delete(:printpath)
    job.join
    raise job.messages.last if job.error?
    if Open.remote? job.path
      puts job.url + Log.color(:blue, "?_format=raw")
    else
      puts job.path
    end
    exit 0
  end

  if do_fork
    puts
    space = 1
    Log.tty_size ||= 100

    while not job.done?
      message = (job.messages and job.messages.any?) ? job.messages.last.strip : "no message"
      status = job.status || "no status"
      if job.info and job.info.include? :issued
        issued = job.info[:issued]
        issued = Time.parse(issued) unless Time === issued
        time = Time.now - issued
      end

      space.times do
        Log.clear_line 
      end

      puts "#{Log.color :blue, job.path}"
      str = "Waiting on #{Log.color :blue, job.info[:pid] || job.pid} (#{time ? time.to_i : '?'} sec. ago) " << [Log.color(:cyan, status.to_s),message.strip].compact*" "
      puts Misc.format_paragraph str, Log.tty_size

      space = 2 + Log.uncolor(str).length / Log.tty_size
      sleep 2
    end
    raise job.messages.last if job.error?

    if job.info and job.info.include? :issued
      issued = job.info[:issued]
      issued = Time.parse(issued) unless Time === issued
      time = Time.now - issued
    end

    space.times do
      Log.clear_line 
    end

    if Open.remote? job.path
      out.puts job.path + Log.color(:blue, "?_format=raw")
    else
      out.puts job.path
    end

    exit 0
  end
rescue ParameterException
  SOPT.delete_inputs(workflow.rec_inputs(task.name))
  usage(workflow, task, $!)
  puts Log.color :magenta, "Options:"
  puts
  report_options saved_job_options
  puts
  exit -1
end

if options.delete(:list_job_files)
  out.puts job.files * "\n"
  exit 0
end

if job_file = options.delete(:job_file)
  job.join
  file = job.file(job_file)
  out.puts Path === file ? file.read : file
  exit 0
end

case res
when (defined?(WorkflowRESTClient) and WorkflowRESTClient::RemoteStep)
  res = job.result
  if res.respond_to? :gets
    begin
      Misc.consume_stream(res, false, out)
    rescue EOFError, IOError
    end
    res.join if res.respond_to? :join
  elsif res.nil?
    job.join
    puts Open.read(job.path, :nocache => true, :nofail => true)
  else
    if Array === res
      out.puts res * "\n"
    else
      out.puts res.to_s
    end
  end
when Step
  if res.streaming?
    io = TSV.get_stream res
    Misc.consume_stream(io, false, out)
    io.join if io.respond_to? :join
  elsif IO === res.result
    begin
      io = res.get_stream
      Misc.consume_stream(io, false, out)
      io.join if io.respond_to? :join
    rescue Aborted, Interrupt
      Log.error "Process interrupted. Aborting step"
      res.abort
      begin
        io.abort if io.respond_to? :abort
        io.join  if io.respond_to? :join
      ensure
        exit -1
      end
    rescue Exception
      Log.exception $!
      res.abort
      begin
        io.abort if io.respond_to? :abort
        io.join  if io.respond_to? :join
      ensure
        exit -1
      end
    end
  else
    res.join
    out.puts Open.read(res.path) if File.exist? res.path
  end
else
  if Array === res
    out.puts res * "\n"
  else
    out.puts res.to_s
  end
end

exit 0