require 'rbbt'

module Misc
  class << self
    attr_accessor :sensiblewrite_lock_dir
    
    def sensible_write_locks
      @sensiblewrite_locks ||= Rbbt.tmp.sensiblewrite_locks.find
    end
  end

  class << self
    attr_accessor :sensiblewrite_dir
    def sensiblewrite_dir
      @sensiblewrite_dir = Rbbt.tmp.sensiblewrite.find
    end
  end

  PIPE_MUTEX = Mutex.new

  OPEN_PIPE_IN = []
  def self.pipe
    OPEN_PIPE_IN.delete_if{|pipe| pipe.closed? }
    PIPE_MUTEX.synchronize do
      sout, sin = IO.pipe
      OPEN_PIPE_IN << sin

      [sout, sin]
    end
  end
  
  def self.release_pipes(*pipes)
    PIPE_MUTEX.synchronize do
      pipes.flatten.each do |pipe|
        pipe.close unless pipe.closed?
      end
    end
  end

  def self.purge_pipes(*save)
    PIPE_MUTEX.synchronize do
      OPEN_PIPE_IN.each do |pipe|
        next if save.include? pipe
        pipe.close unless pipe.closed?
      end
    end
  end

  def self.open_pipe(do_fork = false, close = true)
    raise "No block given" unless block_given?

    sout, sin = Misc.pipe

    if do_fork
      parent_pid = Process.pid
      pid = Process.fork {
        purge_pipes(sin)
        sout.close
        begin
          yield sin
          sin.close if close and not sin.closed? 
        rescue
          Log.exception $!
          Process.kill :INT, parent_pid
          Kernel.exit! -1
        end
        Kernel.exit! 0
      }
      sin.close 
      ConcurrentStream.setup sout, :pids => [pid]
    else
      thread = Thread.new(Thread.current) do |parent|
        begin
          yield sin
          sin.close if close and not sin.closed?
        rescue Aborted
          Log.medium "Aborted open_pipe: #{$!.message}"
        rescue Exception
          Log.medium "Exception in open_pipe: #{$!.message}"
          parent.raise $!
          raise $!
        end
      end
      ConcurrentStream.setup sout, :threads => [thread]
    end
    sout
  end

  def self.tee_stream_thread(stream)
    stream_out1, stream_in1 = Misc.pipe
    stream_out2, stream_in2 = Misc.pipe

    if ConcurrentStream === stream 
      stream.annotate stream_out1
    end

    splitter_thread = Thread.new(Thread.current) do |parent|
      begin
        skip1 = skip2 = false
        while block = stream.read(2048)
          begin 
            stream_in1.write block; 
          rescue IOError
            Log.medium("Tee stream 1 #{Misc.fingerprint stream} IOError: #{$!.message}");
            skip1 = true
          end unless skip1 

          begin 
            stream_in2.write block
          rescue IOError
            Log.medium("Tee stream 2 #{Misc.fingerprint stream} IOError: #{$!.message}");
            skip2 = true
          end unless skip2 
        end
        stream_in1.close unless stream_in1.closed?
        stream.join if stream.respond_to? :join
        stream_in2.close unless stream_in2.closed?
      rescue Aborted, Interrupt
        stream_out1.abort if stream_out1.respond_to? :abort
        stream.abort if stream.respond_to? :abort
        stream_out2.abort if stream_out2.respond_to? :abort
        Log.medium "Tee aborting #{Misc.fingerprint stream}"
        raise $!
      rescue Exception
        stream_out1.abort if stream_out1.respond_to? :abort
        stream.abort if stream.respond_to? :abort
        stream_out2.abort if stream_out2.respond_to? :abort
        Log.medium "Tee exception #{Misc.fingerprint stream}"
        raise $!
      end
    end

    ConcurrentStream.setup stream_out1, :threads => splitter_thread
    ConcurrentStream.setup stream_out2, :threads => splitter_thread

    stream_out1.callback = stream.callback if stream.respond_to? :callback
    stream_out1.abort_callback = stream.abort_callback if stream.respond_to? :abort_callback

    stream_out2.callback = stream.callback if stream.respond_to? :callback
    stream_out2.abort_callback = stream.abort_callback if stream.respond_to? :abort_callback

    [stream_out1, stream_out2]
  end

  class << self
    alias tee_stream tee_stream_thread 
  end

  def self.read_full_stream(io)
    str = ""
    begin
      while block = io.read(2048)
        str << block
      end
      io.join if io.respond_to? :join
    rescue
      io.abort if io.respond_to? :abort
    end
    str
  end

  def self.consume_stream(io, in_thread = false, into = nil)
    return if Path === io
    return unless io.respond_to? :read 
    if io.respond_to? :closed? and io.closed?
      io.join if io.respond_to? :join
      return
    end

    if in_thread
      Thread.new do
        consume_stream(io, false)
      end
    else
      Log.medium "Consuming stream #{Misc.fingerprint io}"
      begin
        into.sync == true if IO === into
        while not io.closed? and block = io.read(2048)
          into << block if into
        end
        io.join if io.respond_to? :join
        io.close unless io.closed?
      rescue Aborted
        Log.medium "Consume stream aborted #{Misc.fingerprint io}"
        io.abort if io.respond_to? :abort
        io.close unless io.closed?
      rescue Exception
        Log.medium "Exception consuming stream: #{Misc.fingerprint io}: #{$!.message}"
        io.abort if io.respond_to? :abort
        io.close unless io.closed?
        raise $!
      end
    end
  end

  def self.read_stream(stream, size)
    str = nil
    Thread.pass while IO.select([stream],nil,nil,1).nil?
    while not str = stream.read(size)
      IO.select([stream],nil,nil,1) 
      Thread.pass
      raise ClosedStream if stream.eof?
    end

    while str.length < size
      raise ClosedStream if stream.eof?
      IO.select([stream],nil,nil,1)
      if new = stream.read(size-str.length)
        str << new
      end
    end
    str
  end

  def self.sensiblewrite(path, content = nil, options = {}, &block)
    force = Misc.process_options options, :force

    if Open.exists? path and not force
      Misc.consume_stream content 
      return
    end

    lock_options = Misc.pull_keys options.dup, :lock
    lock_options = lock_options[:lock] if Hash === lock_options[:lock]
    tmp_path = Persist.persistence_path(path, {:dir => Misc.sensiblewrite_dir})
    tmp_path_lock = Persist.persistence_path(path, {:dir => Misc.sensiblewrite_lock_dir})
    Misc.lock tmp_path_lock, lock_options do

      if Open.exists? path and not force
        Misc.consume_stream content 
        return
      end

      FileUtils.mkdir_p File.dirname(tmp_path) unless File.directory? File.dirname(tmp_path)
      FileUtils.rm_f tmp_path if File.exists? tmp_path
      begin
        case
        when block_given?
          File.open(tmp_path, 'wb', &block)
        when String === content
          File.open(tmp_path, 'wb') do |f| f.write content end
        when (IO === content or StringIO === content or File === content)

          Open.write(tmp_path) do |f|
            f.sync = true
            while block = content.read(2048)
              f.write block
            end
          end
        else
          File.open(tmp_path, 'wb') do |f|  end
        end

        begin
          Open.mv tmp_path, path, lock_options
        rescue
          raise $! unless File.exists? path
        end
        content.join if content.respond_to? :join
      rescue Aborted
        Log.medium "Aborted sensiblewrite -- #{ Log.reset << Log.color(:blue, path) }"
        content.abort if content.respond_to? :abort
        Open.rm path if File.exists? path
      rescue Exception
        Log.medium "Exception in sensiblewrite: #{$!.message} -- #{ Log.color :blue, path }"
        content.abort if content.respond_to? :abort
        Open.rm path if File.exists? path
        raise $!
      ensure
        FileUtils.rm_f tmp_path if File.exists? tmp_path
      end
    end
  end

  def self.process_stream(s)
    begin
      yield s
      s.join if s.respond_to? :join
    rescue
      s.abort if s.respond_to? :abort
      raise $!
    end
  end

  def self.sort_stream(stream, header_hash = "#", cmd_args = " -u ")
    Misc.open_pipe do |sin|
      begin
        if defined? Step and Step === stream
          step = stream
          stream = stream.get_stream || stream.path.open
        end

        line = stream.gets
        while line =~ /^#{header_hash}/ do
          sin.puts line
          line = stream.gets
        end

        line_stream = Misc.open_pipe do |line_stream_in|
          begin
            while line
              line_stream_in.puts line
              line = stream.gets
            end
            stream.join if stream.respond_to? :join
          rescue
            stream.abort if stream.respond_to? :abort
            raise $!
          end
        end

        sorted = CMD.cmd("sort #{cmd_args || ""}", :in => line_stream, :pipe => true)

        while block = sorted.read(2048)
          sin.write block
        end
      rescue
        if defined? step and step
          step.abort
        end
      end
    end
  end

  def self.collapse_stream(s, line = nil, sep = "\t", header = nil)
    sep ||= "\t"
    Misc.open_pipe do |sin|
      sin.puts header if header
      process_stream(s) do |s|
        line ||= s.gets

        current_parts = []
        while line 
          key, *parts = line.strip.split(sep, -1)
          current_key ||= key
          case
          when key.nil?
          when current_key == key
            parts.each_with_index do |part,i|
              if current_parts[i].nil?
                current_parts[i] = part
              else
                current_parts[i] = current_parts[i] << "|" << part
              end
            end
          when current_key != key
            sin.puts [current_key, current_parts].flatten * sep
            current_key = key
            current_parts = parts
          end
          line = s.gets
        end

        sin.puts [current_key, current_parts].flatten * sep unless current_key.nil?
      end
    end
  end

  def self._paste_streams(streams, output, lines = nil, sep = "\t", header = nil)
    output.puts header if header
    streams = streams.collect do |stream|
      if defined? Step and Step === stream
        stream.get_stream || stream.join.path.open
      else
        stream
      end
    end

    begin
      done_streams = []
      lines ||= streams.collect{|s| s.gets }
      keys = []
      parts = []
      lines.each_with_index do |line,i|
        key, *p = line.strip.split(sep, -1) 
        keys[i] = key
        parts[i] = p
      end
      sizes = parts.collect{|p| p.length }
      last_min = nil
      while lines.compact.any?
        min = keys.compact.sort.first
        str = []
        keys.each_with_index do |key,i|
          case key
          when min
            str << [parts[i] * sep]
            line = lines[i] = streams[i].gets
            if line.nil?
              keys[i] = nil
              parts[i] = nil
            else
              k, *p = line.strip.split(sep, -1)
              keys[i] = k
              parts[i] = p
            end
          else
            str << [sep * (sizes[i]-1)] if sizes[i] > 0
          end
        end

        output.puts [min, str*sep] * sep
      end
      streams.each do |stream|
        stream.join if stream.respond_to? :join
      end
    rescue 
      Log.exception $!
      streams.each do |stream|
        stream.abort if stream.respond_to? :abort
      end
      raise $!
    end
  end

  def self.paste_streams(streams, lines = nil, sep = "\t", header = nil)
    sep ||= "\t"
    num_streams = streams.length
    Misc.open_pipe do |sin|
      self._paste_streams(streams, sin, lines, sep, header)
    end
  end

  def self.dup_stream(stream)
    stream_dup = stream.dup
    if stream.respond_to? :annotate
      stream.annotate stream_dup
      stream.clear
    end
    tee1, tee2 = Misc.tee_stream stream_dup
    stream.reopen(tee1)
    tee2
  end

  def self.save_stream(file, stream)
    out, save = Misc.tee_stream stream

    Thread.new(Thread.current) do |parent|
      begin
        Misc.sensiblewrite(file, save)
      rescue Exception
        save.abort if save.respond_to? :abort
        stream.abort if stream.respond_to? :abort
        stream.join
        Log.medium "Exception in save_stream: #{$!.message}"
        raise $!
      end
    end

    out
  end

end