Sha256: 3e0e5330384da5b8f46771c90d759320e2b6d2a945eeae5dd4e014637749e9fa

Contents?: true

Size: 1.89 KB

Versions: 28

Compression:

Stored size: 1.89 KB

Contents

require 'rbbt/util/semaphore'

class RbbtProcessQueue
  class RbbtProcessSocket

    Serializer = Marshal

    attr_accessor :sread, :swrite, :write_sem, :read_sem
    def initialize
      @sread, @swrite = Misc.pipe

      key = rand(100000000).to_s;
      @write_sem = key + '.in'
      @read_sem = key + '.out'
      RbbtSemaphore.create_semaphore(@write_sem,1)
      RbbtSemaphore.create_semaphore(@read_sem,1)
    end

    def clean
      @sread.close unless @sread.closed?
      @swrite.close unless @swrite.closed?
      RbbtSemaphore.delete_semaphore(@write_sem)
      RbbtSemaphore.delete_semaphore(@read_sem)
    end


    def dump(obj, stream)
      case obj
      when String
        payload = obj
        size_head = [payload.bytesize,"S"].pack 'La'
        str = size_head << payload
      else
        payload = Serializer.dump(obj)
        size_head = [payload.bytesize,"M"].pack 'La'
        str = size_head << payload
      end

      write_length = str.length
      #IO.select(nil, [stream])
      wrote = stream.write(str) 
      while wrote < write_length
        wrote += stream.write(str[wrote..-1]) 
      end
    end

    def load(stream)
      size_head = Misc.read_stream stream, 5

      size, type = size_head.unpack('La')

      begin
        payload = Misc.read_stream stream, size
        case type
        when "M"
          Serializer.load(payload)
        when "S"
          payload
        end
      rescue TryAgain
        retry
      end
    end

    def closed_read?
      @sread.closed?
    end

    def closed_write?
      @swrite.closed?
    end

    def close_write
      @swrite.close
    end

    def close_read
      @sread.close 
    end
    #{{{ ACCESSOR
  
    
    def push(obj)
      RbbtSemaphore.synchronize(@write_sem) do
        self.dump(obj, @swrite)
      end
    end

    def pop
      RbbtSemaphore.synchronize(@read_sem) do
        self.load(@sread)
      end
    end
  end
end

Version data entries

28 entries across 28 versions & 1 rubygems

Version Path
rbbt-util-5.13.16 lib/rbbt/util/concurrency/processes/socket.rb
rbbt-util-5.13.15 lib/rbbt/util/concurrency/processes/socket.rb
rbbt-util-5.13.14 lib/rbbt/util/concurrency/processes/socket.rb
rbbt-util-5.13.13 lib/rbbt/util/concurrency/processes/socket.rb
rbbt-util-5.13.12 lib/rbbt/util/concurrency/processes/socket.rb
rbbt-util-5.13.11 lib/rbbt/util/concurrency/processes/socket.rb
rbbt-util-5.13.10 lib/rbbt/util/concurrency/processes/socket.rb
rbbt-util-5.13.9 lib/rbbt/util/concurrency/processes/socket.rb
rbbt-util-5.13.8 lib/rbbt/util/concurrency/processes/socket.rb
rbbt-util-5.13.7 lib/rbbt/util/concurrency/processes/socket.rb
rbbt-util-5.13.6 lib/rbbt/util/concurrency/processes/socket.rb
rbbt-util-5.13.5 lib/rbbt/util/concurrency/processes/socket.rb
rbbt-util-5.13.4 lib/rbbt/util/concurrency/processes/socket.rb
rbbt-util-5.13.3 lib/rbbt/util/concurrency/processes/socket.rb
rbbt-util-5.13.2 lib/rbbt/util/concurrency/processes/socket.rb
rbbt-util-5.13.1 lib/rbbt/util/concurrency/processes/socket.rb
rbbt-util-5.13.0 lib/rbbt/util/concurrency/processes/socket.rb
rbbt-util-5.12.3 lib/rbbt/util/concurrency/processes/socket.rb
rbbt-util-5.12.2 lib/rbbt/util/concurrency/processes/socket.rb
rbbt-util-5.12.1 lib/rbbt/util/concurrency/processes/socket.rb