module TSV
  def self.obj_stream(obj)
    case obj
    when IO, File
      obj
    when TSV::Dumper
      obj.stream
    when TSV::Parser
      obj.stream
    end
  end

  def self.stream_name(obj)
    filename_obj   = obj.respond_to?(:filename) ? obj.filename : nil
    filename_obj ||= obj.respond_to?(:path) ? obj.path : nil
    stream_obj = obj_stream(obj)
    filename_obj.nil? ? stream_obj.inspect : filename_obj + "(#{stream_obj.inspect})"
  end

  def self.report(msg, obj, into)
    into = into[:into] if Hash === into and into.include? :into

    Log.error "#{ msg } #{stream_name(obj)} -> #{stream_name(into)}"
  end

  def self.traverse_tsv(tsv, options = {}, &block)
    callback = Misc.process_options options, :callback

    if callback
      tsv.through options[:key_field], options[:fields] do |k,v|
        callback.call yield(k,v)
      end
    else
      tsv.through options[:key_field], options[:fields] do |k,v|
        yield k,v 
      end
    end
  end

  def self.traverse_hash(hash, options = {}, &block)
    callback = Misc.process_options options, :callback

    if callback
      hash.each do |k,v|
        callback.call yield(k,v)
      end
    else
      hash.each do |k,v|
        yield k,v 
      end
    end
  end

  def self.traverse_array(array, options = {}, &block)
    callback = Misc.process_options options, :callback

    if callback
      array.each do |e|
        res = yield(e)
        callback.call res
      end
    else
      array.each do |e|
        yield e
      end
    end
  end

  def self.traverse_io_array(io, options = {}, &block)
    callback = Misc.process_options options, :callback
    if callback
      while line = io.gets
        res = yield line.strip
        callback.call res
      end
    else
      while line = io.gets
        yield line.strip
      end
    end
  end

  def self.traverse_io(io, options = {}, &block)
    filename = io.filename if io.respond_to? :filename
    callback = Misc.process_options options, :callback
    begin
      if callback
        TSV::Parser.traverse(io, options) do |k,v|
          res = yield k, v
          callback.call res
        end
      else
        TSV::Parser.traverse(io, options, &block)
      end
    rescue
      Log.error "Traverse IO error"
      raise $!
    end
  end

  def self.traverse_obj(obj, options = {}, &block)
    filename = obj.filename if obj.respond_to? :filename
    if options[:type] == :keys
      options[:fields] = []
      options[:type] = :single
    end

    case obj
    when TSV
      traverse_tsv(obj, options, &block)
    when Hash
      traverse_hash(obj, options, &block)
    when TSV::Parser
      callback = Misc.process_options options, :callback
      if callback
        obj.traverse(options) do |k,v|
          res = yield k, v
          callback.call res
        end
      else
        obj.traverse(options, &block)
      end
    when IO, File
      begin
        if options[:type] == :array
          traverse_io_array(obj, options, &block)
        else
          traverse_io(obj, options, &block)
        end
      rescue Exception
        obj.abort if obj.respond_to? :abort
        raise $!
      ensure
        obj.close if obj.respond_to? :close and not obj.closed?
        obj.join if obj.respond_to? :join
      end
    when Path
      obj.open do |stream|
        traverse_obj(stream, options, &block)
      end
    when TSV::Dumper
      traverse_obj(obj.stream, options, &block)
    when (defined? Step and Step)

      stream = obj.get_stream

      if stream
        traverse_obj(stream, options, &block)
      else
        obj.join
        traverse_obj(obj.path, options, &block)
      end
    when Array
      traverse_array(obj, options, &block)
    when nil
      raise "Can not traverse nil object into #{stream_name(options)}"
    else
      raise "Unknown object for traversal: #{Misc.fingerprint obj }"
    end
  end

  def self.traverse_threads(num, obj, options, &block)
    callback = Misc.process_options options, :callback

    q = RbbtThreadQueue.new num

    if callback
      block = Proc.new do |k,v,mutex|
        v, mutex = nil, v if mutex.nil?
        res = yield k, v, mutex
        mutex.synchronize do
          callback.call res 
        end
      end
    end

    q.init true, &block

    traverse_obj(obj, options) do |*p|
      q.process p
    end

    q.join
    q.clean
    nil
  end

  def self.traverse_cpus(num, obj, options, &block)
    begin
      filename = obj.respond_to?(:filename)? obj.filename : "none"
      callback, cleanup = Misc.process_options options, :callback, :cleanup
      q = RbbtProcessQueue.new num, cleanup

      q.callback &callback
      q.init &block

      thread = Thread.new do 
        traverse_obj(obj, options) do |*p|
          q.process *p
        end
      end

      thread.join
    rescue Exception
      Log.error "Exception traversing in cpus: #{$!.message}"
      Log.exception $!

      stream = obj_stream(obj)
      stream.abort if stream.respond_to? :abort
      stream = obj_stream(options[:into])
      stream.abort if stream.respond_to? :abort
      q.abort
      raise $!
    ensure
      q.join
    end
  end

  def self.store_into(store, value)
    begin
      case store
      when Hash
        return false if value.nil?
        if Hash === value
          if TSV === store and store.type == :double
            store.merge_zip value
          else
            store.merge! value
          end
        else
          k,v = value
          store[k] = v
        end
      when TSV::Dumper
        return false if value.nil?
        store.add *value
      when IO
        return false if value.nil?
        value.strip!
        store.puts value
      else
        store << value
      end 
      true
    rescue
      raise "Error storing into #{store.inspect}: #{$!.message}"
    end
  end

  def self.get_streams_to_close(obj)
    close_streams = []
    case obj
    when IO, File
      close_streams << obj
    when TSV::Parser
    when TSV::Dumper
      close_streams << obj.result.in_stream
    when (defined? Step and Step)
      obj.mutex.synchronize do
        case obj.result
        when IO
          close_streams << obj.result
        when TSV::Dumper
          close_streams << obj.result.in_stream
        end
      end
      obj.inputs.each do |input|
        close_streams = get_streams_to_close(input) + close_streams
      end
      obj.dependencies.each do |dependency|
        close_streams = get_streams_to_close(dependency) + close_streams
      end
    end 
    close_streams
  end

  def self.traverse_run(obj, threads, cpus, options = {}, &block)
    if threads.nil? and cpus.nil? 
      traverse_obj obj, options, &block
    else
      if threads
        traverse_threads threads, obj, options, &block 
      else
        close_streams = Misc.process_options(options, :close_streams) || []
        close_streams = [close_streams] unless Array === close_streams

        close_streams.concat(get_streams_to_close(obj))
        options[:close_streams] = close_streams

        options[:cleanup] = Proc.new do
          close_streams.uniq.each do |s|
            s.close unless s.closed?
          end
        end if close_streams and close_streams.any?

        traverse_cpus cpus, obj, options, &block
      end
    end
  end

  def self.traverse_stream(obj, threads, cpus, options, &block)
    into = options[:into]
    thread = Thread.new(Thread.current, obj) do |parent,obj|
      begin
        traverse_run(obj, threads, cpus, options, &block)
        into.close if into.respond_to? :close
      rescue Exception
        Log.exception $!
        parent.raise $!
        stream = obj_stream(into)
        stream.abort if stream and stream.respond_to? :abort
      end
    end
    ConcurrentStream.setup(obj_stream(into), :threads => thread)
  end

  def self.traverse(obj, options = {}, &block)
    threads = Misc.process_options options, :threads
    cpus = Misc.process_options options, :cpus
    into = options[:into]

    threads = nil if threads and threads.to_i <= 1
    cpus = nil if cpus and cpus.to_i <= 1

    if into == :stream
      sout = Misc.open_pipe false, false do |sin|                                                                                                                                           
        begin
          traverse(obj, options.merge(:into => sin), &block)                                                                                                                                  
        rescue Exception
          sout.abort if sout.respond_to? :abort
          sout.join if sout.respond_to? :join
        end
      end                                                                                                                                                                                   
      return sout
    end

    if into
      options[:callback] = Proc.new do |e|
        begin
          store_into into, e
        rescue Exception
          Log.exception $!
          raise $!
        end
      end

      case into
      when TSV::Dumper, IO
        traverse_stream(obj, threads, cpus, options, &block)
      else
        traverse_run(obj, threads, cpus, options, &block)
        into.close if into.respond_to? :close
      end

      into
    else
      traverse_run(obj, threads, cpus, options, &block)
    end
  end
end