Sha256: ea63c5e623adc883ca3644e20bd1be3414522390e2756a999d6fb0e4719ec2bd

Contents?: true

Size: 1.08 KB

Versions: 1

Compression:

Stored size: 1.08 KB

Contents

module Workhorse
  # Abstraction layer of a simple thread pool implementation used by the worker.
  class Pool
    attr_reader :mutex

    def initialize(size)
      @size = size
      @executor = Concurrent::ThreadPoolExecutor.new(
        min_threads: 0,
        max_threads: @size,
        max_queue: 0,
        fallback_policy: :abort,
        auto_terminate: false
      )
      @mutex = Mutex.new
      @active_threads = Concurrent::AtomicFixnum.new(0)
    end

    # Posts a new work unit to the pool.
    def post
      mutex.synchronize do
        if @active_threads.value >= @size
          fail 'All threads are busy.'
        end

        active_threads = @active_threads

        active_threads.increment

        @executor.post do
          begin
            yield
          ensure
            active_threads.decrement
          end
        end
      end
    end

    # Returns the number of idle threads.
    def idle
      @size - @active_threads.value
    end

    # Shuts down the pool
    def shutdown
      @executor.shutdown
      @executor.wait_for_termination
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
workhorse-0.0.2 lib/workhorse/pool.rb