Sha256: 703888fe227b9fbac0dd8deadea536a27ce3ebbbbc5f083ab7a40c7c0b8fc454

Contents?: true

Size: 1.58 KB

Versions: 2

Compression:

Stored size: 1.58 KB

Contents

# reference implementation: puma
# https://github.com/puma/puma/blob/v2.7.1/lib/puma/thread_pool.rb

require 'thread'
require 'promise_pool/queue'
require 'promise_pool/task'

module PromisePool
  class ThreadPool
    attr_reader :workers
    attr_accessor :max_size, :idle_time

    def initialize max_size, idle_time=60
      @max_size  = max_size
      @idle_time = idle_time
      @queue     = Queue.new
      @mutex     = Mutex.new
      @workers   = []
      @waiting   = 0
    end

    def size
      workers.size
    end

    def defer promise_mutex, &job
      mutex.synchronize do
        task = Task.new(job, promise_mutex)
        queue << task
        spawn_worker if waiting < queue.size && workers.size < max_size
        task
      end
    end

    def trim force=false
      mutex.synchronize do
        queue << lambda{ |_| false } if force || waiting > 0
      end
    end

    # Block on shutting down, and should not add more jobs while shutting down
    def shutdown
      workers.size.times{ trim(true) }
      workers.first.join && trim(true) until workers.empty?
      mutex.synchronize{ queue.clear }
    end

    protected
    attr_reader :queue, :mutex, :condv, :waiting

    private
    def spawn_worker
      workers << Thread.new{
        Thread.current.abort_on_exception = true

        task = nil
        begin
          mutex.synchronize do
            @waiting += 1
            task = queue.pop(mutex, idle_time)
            @waiting -= 1
          end
        end while task.call(Thread.current)

        mutex.synchronize{ workers.delete(Thread.current) }
      }
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
promise_pool-0.9.1 lib/promise_pool/thread_pool.rb
promise_pool-0.9.0 lib/promise_pool/thread_pool.rb