Sha256: 008d6e84a01699cb7266bf908d6fcf585d562676acdf246362c758435d4f49e7
Contents?: true
Size: 1.22 KB
Versions: 2
Compression:
Stored size: 1.22 KB
Contents
module Workhorse # Abstraction layer of a simple thread pool implementation used by the worker. class Pool attr_reader :mutex def initialize(size) @size = size @executor = Concurrent::ThreadPoolExecutor.new( min_threads: 0, max_threads: @size, max_queue: 0, fallback_policy: :abort, auto_terminate: false ) @mutex = Mutex.new @active_threads = Concurrent::AtomicFixnum.new(0) @on_idle = nil end def on_idle(&block) @on_idle = block end # Posts a new work unit to the pool. def post mutex.synchronize do if idle.zero? fail 'All threads are busy.' end active_threads = @active_threads active_threads.increment @executor.post do begin yield ensure active_threads.decrement @on_idle.try(:call) end end end end # Returns the number of idle threads. def idle @size - @active_threads.value end def wait @executor.wait_for_termination end # Shuts down the pool and waits for termination. def shutdown @executor.shutdown wait end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
workhorse-0.6.3 | lib/workhorse/pool.rb |
workhorse-0.6.2 | lib/workhorse/pool.rb |