Sha256: df55a02e23a12ae0c3ac35fc79ea1c39ebf1ce36c14236a759c4923448f18cb2

Contents?: true

Size: 1.32 KB

Versions: 1

Compression:

Stored size: 1.32 KB

Contents

require 'thread'

class DatWorkerPool

  class Queue

    attr_accessor :on_push_callbacks, :on_pop_callbacks

    def initialize
      @work_items = []
      @shutdown = false
      @mutex = Mutex.new
      @condition_variable = ConditionVariable.new

      @on_pop_callbacks  = []
      @on_push_callbacks = []
    end

    def work_items
      @mutex.synchronize{ @work_items }
    end

    # Add the work_item and wake up the first worker (the `signal`) that's
    # waiting (because of `wait_for_work_item`)
    def push(work_item)
      raise "Unable to add work while shutting down" if @shutdown
      @mutex.synchronize do
        @work_items << work_item
        @condition_variable.signal
      end
      @on_push_callbacks.each(&:call)
    end

    def pop
      return if @shutdown
      item = @mutex.synchronize do
        @condition_variable.wait(@mutex) while !@shutdown && @work_items.empty?
        @work_items.shift
      end
      @on_pop_callbacks.each(&:call)
      item
    end

    def empty?
      @mutex.synchronize{ @work_items.empty? }
    end

    def start
      @shutdown = false
    end

    # wake up any workers who are idle (because of `wait_for_work_item`)
    def shutdown
      @shutdown = true
      @mutex.synchronize{ @condition_variable.broadcast }
    end

    def shutdown?
      @shutdown
    end

  end

end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
dat-worker-pool-0.4.0 lib/dat-worker-pool/queue.rb