require 'rbbt/util/semaphore'

class RbbtProcessQueue
  class RbbtProcessSocket

    attr_accessor :sread, :swrite, :write_sem, :read_sem, :cleaned
    def initialize(serializer = nil)
      @sread, @swrite = Misc.pipe

      @serializer = serializer || Marshal

      @key = "/" << rand(1000000000).to_s << '.' << Process.pid.to_s;
      @write_sem = @key + '.in'
      @read_sem = @key + '.out'
      Log.debug "Creating socket semaphores: #{@key}"
      RbbtSemaphore.create_semaphore(@write_sem,1)
      RbbtSemaphore.create_semaphore(@read_sem,1)
    end

    def clean
      @cleaned = true
      @sread.close unless @sread.closed?
      @swrite.close unless @swrite.closed?
      Log.debug "Destroying socket semaphores: #{[@key] * ", "}"
      RbbtSemaphore.delete_semaphore(@write_sem)
      RbbtSemaphore.delete_semaphore(@read_sem)
    end


    def dump(obj, stream)
      case obj
      when String
        payload = obj
        size_head = [payload.bytesize,"C"].pack 'La'
        str = size_head << payload
      else
        payload = @serializer.dump(obj)
        size_head = [payload.bytesize,"S"].pack 'La'
        str = size_head << payload
      end

      write_length = str.length
      wrote = stream.write(str) 
      while wrote < write_length
        wrote += stream.write(str[wrote..-1]) 
      end
    end

    def load(stream)
      size_head = Misc.read_stream stream, 5

      size, type = size_head.unpack('La')

      begin
        payload = Misc.read_stream stream, size
        case type
        when "S"
          begin
            @serializer.load(payload)
          rescue Exception
            Log.exception $!
            raise $!
          end
        when "C"
          payload
        end
      rescue TryAgain
        retry
      end
    end

    def closed_read?
      @sread.closed?
    end

    def closed_write?
      @swrite.closed?
    end

    def close_write
      @swrite.close
    end

    def close_read
      @sread.close 
    end
    #{{{ ACCESSOR
  
    
    def push(obj)
      RbbtSemaphore.synchronize(@write_sem) do
        multiple = MultipleResult === obj
        obj = Annotated.purge(obj)
        obj.extend MultipleResult if multiple
        self.dump(obj, @swrite)
      end
    end

    def pop
      RbbtSemaphore.synchronize(@read_sem) do
        self.load(@sread)
      end
    end
  end
end