Sha256: 5945614ec685b770854173e175c7251239749a6ee9a6d167d672c15c795ea408

Contents?: true

Size: 1.78 KB

Versions: 4

Compression:

Stored size: 1.78 KB

Contents

module PoolParty
  class ThreadPool
    class Worker
      def initialize
        @mutex = Mutex.new
        @thread = Thread.new do
          while true
            sleep 0.001
            block = get_block
            if block
              block.call
              reset_block
            end
          end
        end
      end

      def get_block
        @mutex.synchronize {@block}
      end

      def set_block(block)
        @mutex.synchronize do
          raise RuntimeError, "Thread already busy." if @block
          @block = block
        end
      end

      def reset_block
        @mutex.synchronize {@block = nil}
      end

      def busy?
        @mutex.synchronize {!@block.nil?}
      end
    end

    attr_accessor :max_size
    attr_reader :workers

    def initialize(max_size = 10)
      @max_size = max_size
      @workers = []
      @mutex = Mutex.new
    end

    def size
      @mutex.synchronize {@workers.size}
    end

    def busy?
      @mutex.synchronize {@workers.any? {|w| w.busy?}}
    end

    def join
      sleep 0.01 while busy?
    end

    def process(&block)
        while true
            @mutex.synchronize do
                worker = find_available_worker 
                if worker
                    return worker.set_block(block)
                end
            end
            sleep 0.01
        end
    end

    def find_available_worker
      free_worker || create_worker
    end

    def wait_for_worker
      while true
        worker = find_available_worker
        return worker if worker
        sleep 0.01
      end
    end

    def free_worker
      @workers.each {|w| return w unless w.busy?}; nil
    end

    def create_worker
      return nil if @workers.size >= @max_size
      worker = Worker.new
      @workers << worker
      worker
    end
  end  
end

Version data entries

4 entries across 4 versions & 2 rubygems

Version Path
auser-poolparty-0.1.0 lib/poolparty/thread_pool.rb
auser-poolparty-0.1.1 lib/poolparty/thread_pool.rb
auser-poolparty-0.1.2 lib/poolparty/thread_pool.rb
jtzemp-poolparty-0.1.2 lib/poolparty/thread_pool.rb