Sha256: df55a02e23a12ae0c3ac35fc79ea1c39ebf1ce36c14236a759c4923448f18cb2
Contents?: true
Size: 1.32 KB
Versions: 1
Compression:
Stored size: 1.32 KB
Contents
require 'thread' class DatWorkerPool class Queue attr_accessor :on_push_callbacks, :on_pop_callbacks def initialize @work_items = [] @shutdown = false @mutex = Mutex.new @condition_variable = ConditionVariable.new @on_pop_callbacks = [] @on_push_callbacks = [] end def work_items @mutex.synchronize{ @work_items } end # Add the work_item and wake up the first worker (the `signal`) that's # waiting (because of `wait_for_work_item`) def push(work_item) raise "Unable to add work while shutting down" if @shutdown @mutex.synchronize do @work_items << work_item @condition_variable.signal end @on_push_callbacks.each(&:call) end def pop return if @shutdown item = @mutex.synchronize do @condition_variable.wait(@mutex) while !@shutdown && @work_items.empty? @work_items.shift end @on_pop_callbacks.each(&:call) item end def empty? @mutex.synchronize{ @work_items.empty? } end def start @shutdown = false end # wake up any workers who are idle (because of `wait_for_work_item`) def shutdown @shutdown = true @mutex.synchronize{ @condition_variable.broadcast } end def shutdown? @shutdown end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
dat-worker-pool-0.4.0 | lib/dat-worker-pool/queue.rb |