Sha256: afe6714b9e2a5a7420ab1b99346982458dd8253248ea8fe2856c359411760bcd

Contents?: true

Size: 1.99 KB

Versions: 17

Compression:

Stored size: 1.99 KB

Contents

require 'rbbt/util/semaphore'

class RbbtProcessQueue
  class RbbtProcessSocket

    Serializer = Marshal

    attr_accessor :sread, :swrite, :write_sem, :read_sem
    def initialize
      @sread, @swrite = Misc.pipe

      key = "/" << rand(100000000).to_s;
      @write_sem = key + '.in'
      @read_sem = key + '.out'
      Log.low "Creating socket semaphores: #{key}"
      RbbtSemaphore.create_semaphore(@write_sem,1)
      RbbtSemaphore.create_semaphore(@read_sem,1)
    end

    def clean
      @sread.close unless @sread.closed?
      @swrite.close unless @swrite.closed?
      Log.low "Destroying socket semaphores"
      RbbtSemaphore.delete_semaphore(@write_sem)
      RbbtSemaphore.delete_semaphore(@read_sem)
    end


    def dump(obj, stream)
      case obj
      when String
        payload = obj
        size_head = [payload.bytesize,"S"].pack 'La'
        str = size_head << payload
      else
        payload = Serializer.dump(obj)
        size_head = [payload.bytesize,"M"].pack 'La'
        str = size_head << payload
      end

      write_length = str.length
      #IO.select(nil, [stream])
      wrote = stream.write(str) 
      while wrote < write_length
        wrote += stream.write(str[wrote..-1]) 
      end
    end

    def load(stream)
      size_head = Misc.read_stream stream, 5

      size, type = size_head.unpack('La')

      begin
        payload = Misc.read_stream stream, size
        case type
        when "M"
          Serializer.load(payload)
        when "S"
          payload
        end
      rescue TryAgain
        retry
      end
    end

    def closed_read?
      @sread.closed?
    end

    def closed_write?
      @swrite.closed?
    end

    def close_write
      @swrite.close
    end

    def close_read
      @sread.close 
    end
    #{{{ ACCESSOR
  
    
    def push(obj)
      RbbtSemaphore.synchronize(@write_sem) do
        self.dump(obj, @swrite)
      end
    end

    def pop
      RbbtSemaphore.synchronize(@read_sem) do
        self.load(@sread)
      end
    end
  end
end

Version data entries

17 entries across 17 versions & 1 rubygems

Version Path
rbbt-util-5.13.36 lib/rbbt/util/concurrency/processes/socket.rb
rbbt-util-5.13.35 lib/rbbt/util/concurrency/processes/socket.rb
rbbt-util-5.13.34 lib/rbbt/util/concurrency/processes/socket.rb
rbbt-util-5.13.33 lib/rbbt/util/concurrency/processes/socket.rb
rbbt-util-5.13.32 lib/rbbt/util/concurrency/processes/socket.rb
rbbt-util-5.13.31 lib/rbbt/util/concurrency/processes/socket.rb
rbbt-util-5.13.30 lib/rbbt/util/concurrency/processes/socket.rb
rbbt-util-5.13.29 lib/rbbt/util/concurrency/processes/socket.rb
rbbt-util-5.13.28 lib/rbbt/util/concurrency/processes/socket.rb
rbbt-util-5.13.27 lib/rbbt/util/concurrency/processes/socket.rb
rbbt-util-5.13.26 lib/rbbt/util/concurrency/processes/socket.rb
rbbt-util-5.13.25 lib/rbbt/util/concurrency/processes/socket.rb
rbbt-util-5.13.24 lib/rbbt/util/concurrency/processes/socket.rb
rbbt-util-5.13.23 lib/rbbt/util/concurrency/processes/socket.rb
rbbt-util-5.13.22 lib/rbbt/util/concurrency/processes/socket.rb
rbbt-util-5.13.21 lib/rbbt/util/concurrency/processes/socket.rb
rbbt-util-5.13.20 lib/rbbt/util/concurrency/processes/socket.rb