Sha256: d828fbb078bacde21a4cb163804350ea9b93e50418ef9d177f0919f712d46a9d

Contents?: true

Size: 1.85 KB

Versions: 23

Compression:

Stored size: 1.85 KB

Contents

require 'rbbt/util/semaphore'

class RbbtProcessQueue
  class RbbtProcessSocket

    Serializer = Marshal

    attr_accessor :sread, :swrite, :write_sem, :read_sem
    def initialize
      @sread, @swrite = IO.pipe

      key = rand(100000).to_s;
      @write_sem = key + '.in'
      @read_sem = key + '.out'
      RbbtSemaphore.create_semaphore(@write_sem,1)
      RbbtSemaphore.create_semaphore(@read_sem,1)
    end

    def clean
      @sread.close unless @sread.closed?
      @swrite.close unless @swrite.closed?
      RbbtSemaphore.delete_semaphore(@write_sem)
      RbbtSemaphore.delete_semaphore(@read_sem)
    end


    def dump(obj, stream)
      case obj
      when String
        payload = obj
        size_head = [payload.bytesize,"S"].pack 'La'
        str = size_head << payload
      else
        payload = Serializer.dump(obj)
        size_head = [payload.bytesize,"M"].pack 'La'
        str = size_head << payload
      end

      write_length = str.length
      IO.select(nil, [stream])
      wrote = stream.write(str) 
      while wrote < write_length
        wrote += stream.write(str[wrote..-1]) 
      end
    end

    def load(stream)
      size_head = Misc.read_stream stream, 5

      size, type = size_head.unpack('La')

      begin
        payload = Misc.read_stream stream, size
        case type
        when "M"
          Serializer.load(payload)
        when "S"
          payload
        end
      rescue TryAgain
        retry
      end
    end

    #{{{ ACCESSOR
    
    def push(obj)
      begin
        RbbtSemaphore.synchronize(@write_sem) do
          self.dump(obj, @swrite)
        end
      rescue
        return ClosedStream.new
      end
    end

    def pop
      begin
        RbbtSemaphore.synchronize(@read_sem) do
          self.load(@sread)
        end
      rescue IOError, ClosedStream
        return ClosedStream.new
      end
    end
  end
end

Version data entries

23 entries across 23 versions & 1 rubygems

Version Path
rbbt-util-5.8.2 lib/rbbt/util/concurrency/processes/socket.rb
rbbt-util-5.8.1 lib/rbbt/util/concurrency/processes/socket.rb
rbbt-util-5.8.0 lib/rbbt/util/concurrency/processes/socket.rb