Sha256: 34eecd8c72d5894b2873431585a0d79f65e8072e235c48f5a722ed17a14036ff

Contents?: true

Size: 1.65 KB

Versions: 4

Compression:

Stored size: 1.65 KB

Contents

require 'thread'
require 'dat-worker-pool/locked_object'
require 'dat-worker-pool/queue'

class DatWorkerPool

  class DefaultQueue
    include DatWorkerPool::Queue

    attr_reader :on_push_callbacks, :on_pop_callbacks

    def initialize
      @work_items = LockedArray.new
      @cond_var   = ConditionVariable.new

      @on_push_callbacks = []
      @on_pop_callbacks  = []
    end

    def work_items; @work_items.values; end
    def empty?;     @work_items.empty?; end

    def on_push(&block); @on_push_callbacks << block; end
    def on_pop(&block);  @on_pop_callbacks  << block; end

    private

    # wake up workers (`@cond_var.broadcast`) who are sleeping because of `pop`
    def shutdown!
      @work_items.with_lock{ @cond_var.broadcast }
    end

    # add the work item and wakeup (`@cond_var.signal`) the first sleeping
    # worker (from calling `pop`)
    def push!(work_item)
      @work_items.with_lock do |mutex, work_items|
        work_items << work_item
        @cond_var.signal
      end
      @on_push_callbacks.each{ |p| p.call(self, work_item) }
    end

    # check if the queue is empty, if so sleep (`@cond_var.wait(@mutex)`) until
    # signaled (via `push!` or `shutdown!`); once a work item is available pop
    # it from the front and return it; if shutdown, return `nil` which the
    # workers will ignore
    def pop!
      work_item = @work_items.with_lock do |mutex, work_items|
        while !self.shutdown? && work_items.empty?
          @cond_var.wait(mutex)
        end
        work_items.shift unless self.shutdown?
      end
      @on_pop_callbacks.each{ |p| p.call(self, work_item) } if !work_item.nil?
      work_item
    end

  end

end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
dat-worker-pool-0.6.3 lib/dat-worker-pool/default_queue.rb
dat-worker-pool-0.6.2 lib/dat-worker-pool/default_queue.rb
dat-worker-pool-0.6.1 lib/dat-worker-pool/default_queue.rb
dat-worker-pool-0.6.0 lib/dat-worker-pool/default_queue.rb