module Misc

  class << self
    attr_accessor :sensiblewrite_dir
    def sensiblewrite_dir
      @sensiblewrite_dir = Rbbt.tmp.sensiblewrite
    end
  end

  PIPE_MUTEX = Mutex.new

  OPEN_PIPE_IN = []
  def self.pipe
    OPEN_PIPE_IN.delete_if{|pipe| pipe.closed? }
    PIPE_MUTEX.synchronize do
      sout, sin = IO.pipe
      OPEN_PIPE_IN << sin

      [sout, sin]
    end
  end
  
  def self.release_pipes(*pipes)
    PIPE_MUTEX.synchronize do
      pipes.flatten.each do |pipe|
        pipe.close unless pipe.closed?
      end
    end
  end

  def self.purge_pipes(*save)
    PIPE_MUTEX.synchronize do
      OPEN_PIPE_IN.each do |pipe|
        next if save.include? pipe
        pipe.close unless pipe.closed?
      end
    end
  end

  def self.open_pipe(do_fork = false, close = true)
    raise "No block given" unless block_given?

    sout, sin = Misc.pipe

    if do_fork
      parent_pid = Process.pid
      pid = Process.fork {
        purge_pipes(sin)
        sout.close
        begin
          yield sin
          sin.close if close and not sin.closed? 
        rescue
          Log.exception $!
          Process.kill :INT, parent_pid
          Kernel.exit! -1
        end
        Kernel.exit! 0
      }
      sin.close #if close
      ConcurrentStream.setup sout, :pids => [pid]
    else
      thread = Thread.new(Thread.current) do |parent|
        begin
          yield sin
          sin.close if close and not sin.closed?
        rescue
          parent.raise $!
        end
      end
      ConcurrentStream.setup sout, :threads => [thread]
    end
    sout
  end

  def self.tee_stream_fork(stream)
    stream_out1, stream_in1 = Misc.pipe
    stream_out2, stream_in2 = Misc.pipe

    splitter_pid = Process.fork do
      Misc.purge_pipes(stream_in1, stream_in2)
      stream_out1.close
      stream_out2.close
      begin
        filename = stream.respond_to?(:filename)? stream.filename : nil
        skip1 = skip2 = false
        while block = stream.read(2048)
          begin stream_in1.write block; rescue Exception;  Log.exception $!; skip1 = true end unless skip1 
          begin stream_in2.write block; rescue Exception;  Log.exception $!; skip2 = true end unless skip2 
        end
        raise "Error writing in stream_in1" if skip1
        raise "Error writing in stream_in2" if skip2
        stream.join if stream.respond_to? :join
        stream_in1.close 
        stream_in2.close 
      rescue Aborted
        stream.abort if stream.respond_to? :abort
        raise $!
      rescue IOError
        Log.exception $!
      rescue Exception
        Log.exception $!
      end
    end
    stream.close
    stream_in1.close
    stream_in2.close

    ConcurrentStream.setup stream_out1, :pids => [splitter_pid]
    ConcurrentStream.setup stream_out2, :pids => [splitter_pid]

    [stream_out1, stream_out2]
  end

  def self.tee_stream_thread(stream)
    stream_out1, stream_in1 = Misc.pipe
    stream_out2, stream_in2 = Misc.pipe

    splitter_thread = Thread.new(Thread.current, stream_in1, stream_in2) do |parent,stream_in1,stream_in2|
      begin
        filename = stream.respond_to?(:filename)? stream.filename : nil
        skip1 = skip2 = false
        while block = stream.read(2048)
          begin stream_in1.write block; rescue Exception; Aborted === $! ? raise($!): Log.exception($!); skip1 = true end unless skip1 
          begin stream_in2.write block; rescue Exception; Aborted === $! ? raise($!): Log.exception($!); skip2 = true end unless skip2 
        end
        stream_in1.close 
        stream_in2.close 
        stream.join if stream.respond_to? :join
      rescue Aborted
        stream.abort if stream.respond_to? :abort
        parent.raise $!
      rescue IOError
        Log.exception $!
      rescue Exception
        Log.exception $!
        parent.raise $!
      end
    end

    ConcurrentStream.setup stream_out1, :threads => splitter_thread
    ConcurrentStream.setup stream_out2, :threads => splitter_thread

    [stream_out1, stream_out2]
  end

  class << self
    alias tee_stream tee_stream_thread 
  end

  def self.read_full_stream(io)
    str = ""
    begin
      while block = io.read(2048)
        str << block
      end
      io.join if io.respond_to? :join
    rescue
      io.abort if io.respond_to? :abort
    end
    str
  end

  def self.consume_stream(io)
    begin
      while block = io.read(2048)
        return if io.eof?
        Thread.pass 
      end
      io.join if io.respond_to? :join
    rescue
      io.abort if io.respond_to? :abort
    end
  end

  def self.read_stream(stream, size)
    str = nil
    Thread.pass while IO.select([stream],nil,nil,1).nil?
    while not str = stream.read(size)
      IO.select([stream],nil,nil,1) 
      Thread.pass
      raise ClosedStream if stream.eof?
    end

    while str.length < size
      raise ClosedStream if stream.eof?
      IO.select([stream],nil,nil,1)
      if new = stream.read(size-str.length)
        str << new
      end
    end
    str
  end

  def self.read_stream(stream, size)
    str = nil
    Thread.pass while IO.select([stream],nil,nil,1).nil?
    while not str = stream.read(size)
      IO.select([stream],nil,nil,1) 
      Thread.pass
      raise ClosedStream if stream.eof?
    end

    while str.length < size
      raise ClosedStream if stream.eof?
      IO.select([stream],nil,nil,1)
      if new = stream.read(size-str.length)
        str << new
      end
    end
    str
  end

  def self.sensiblewrite(path, content = nil, &block)
    return if File.exists? path
    #tmp_path = path + '.sensible_write'
    tmp_path = Persist.persistence_path(path, {:dir => Misc.sensiblewrite_dir})
    Misc.lock tmp_path do
      if not File.exists? path
        FileUtils.rm_f tmp_path if File.exists? tmp_path
        begin
          case
          when block_given?
            File.open(tmp_path, 'w', &block)
          when String === content
            File.open(tmp_path, 'w') do |f| f.write content end
          when (IO === content or StringIO === content or File === content)
            File.open(tmp_path, 'w') do |f|  
              while block = content.read(2048); 
                f.write block
              end  
            end
          else
            File.open(tmp_path, 'w') do |f|  end
          end
          FileUtils.mv tmp_path, path
        rescue Exception
          Log.error "Exception in sensiblewrite: #{$!.message} -- #{ Log.color :blue, path }"
          FileUtils.rm_f path if File.exists? path
          raise $!
        ensure
          FileUtils.rm_f tmp_path if File.exists? tmp_path
        end
      end
    end
  end
end