Sha256: 476e1e9e5645b392bd30a2fce8266bdfb800c7b651a7495477ed020b877d8803

Contents?: true

Size: 1.26 KB

Versions: 7

Compression:

Stored size: 1.26 KB

Contents

module ConcurrentWorker
  require 'thread'

  class RequestCounter
    def initialize
      @count = Queue.new
      @com = Queue.new
    end
    def push(args)
      @count.push(args)
    end
    def pop
      Thread.handle_interrupt(Object => :never) do
        r = @count.pop
        @com.push(true)
        r
      end
    end
    
    def wait_until_less_than(n)
      return if @count.size < n
      while @com.pop
        break if @count.size < n
      end
    end

    def empty?
      @count.empty?
    end
    
    def size
      @count.size
    end

    def close
      @count.close
    end

    def closed?
      @count.closed?
    end

  end

  class IPCDuplexChannel
    def initialize
      @p_pid = Process.pid
      @p2c = IO.pipe('ASCII-8BIT', 'ASCII-8BIT')
      @c2p = IO.pipe('ASCII-8BIT', 'ASCII-8BIT')
    end

    def choose_io
      w_pipe, r_pipe = @p_pid == Process.pid ? [@p2c, @c2p] : [@c2p, @p2c]
      @wio, @rio = w_pipe[1], r_pipe[0]
      [w_pipe[0], r_pipe[1]].map(&:close)
    end
    
    def send(obj)
      begin
        Marshal.dump(obj, @wio)
      end
    end

    def recv
      begin
        Marshal.load(@rio)
      rescue IOError
        raise StopIteration
      end
    end
    
    def close
      [@wio, @rio].map(&:close)
    end
  end
end

Version data entries

7 entries across 7 versions & 1 rubygems

Version Path
concurrent_worker-0.4.12 lib/concurrent_worker/common.rb
concurrent_worker-0.4.11 lib/concurrent_worker/common.rb
concurrent_worker-0.4.10 lib/concurrent_worker/common.rb
concurrent_worker-0.4.9 lib/concurrent_worker/common.rb
concurrent_worker-0.4.8 lib/concurrent_worker/common.rb
concurrent_worker-0.4.7 lib/concurrent_worker/common.rb
concurrent_worker-0.4.6 lib/concurrent_worker/common.rb