Sha256: bde481bfd3b3eff2a95c1c507e30ad07f73f8592a6dba30abc7deec05faca65c
Contents?: true
Size: 1.5 KB
Versions: 1
Compression:
Stored size: 1.5 KB
Contents
require 'thread' class DatWorkerPool class Queue attr_accessor :on_push_callbacks, :on_pop_callbacks def initialize @work_items = [] @shutdown = false @mutex = Mutex.new @condition_variable = ConditionVariable.new @on_pop_callbacks = [] @on_push_callbacks = [] end def start @shutdown = false end # * Wakes up any threads (`@condition_variable.broadcast`) who are sleeping # because of `pop`. def shutdown @shutdown = true @mutex.synchronize{ @condition_variable.broadcast } end # * Add the work and wake up the first thread waiting from calling `pop` # (`@condition_variable.signal`). def push(work_item) raise "Unable to add work while shutting down" if @shutdown @mutex.synchronize do @work_items << work_item @condition_variable.signal end @on_push_callbacks.each(&:call) end # * Sleeps the current thread (`@condition_variable.wait(@mutex)`) until it # is signaled via `push` or `shutdown`. def pop return if @shutdown item = @mutex.synchronize do @condition_variable.wait(@mutex) while !@shutdown && @work_items.empty? @work_items.shift end @on_pop_callbacks.each(&:call) item end def work_items @mutex.synchronize{ @work_items } end def empty? @mutex.synchronize{ @work_items.empty? } end def shutdown? @shutdown end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
dat-worker-pool-0.5.0 | lib/dat-worker-pool/queue.rb |