Sha256: 3fe12a1e03cae3fa6610975a41b0be7fa41f8f198264df949ed000ddfa54a6ad

Contents?: true

Size: 935 Bytes

Versions: 2

Compression:

Stored size: 935 Bytes

Contents

require 'thread'

class DatWorkerPool

  class Worker
    attr_writer :on_work, :on_waiting, :on_continuing, :on_shutdown

    def initialize(queue)
      @queue = queue
      @on_work       = proc{ |work_item| }
      @on_waiting    = proc{ |worker| }
      @on_continuing = proc{ |worker| }
      @on_shutdown   = proc{ |worker| }

      @shutdown = false
      @thread   = nil
    end

    def start
      @thread ||= Thread.new{ work_loop }
    end

    def running?
      @thread && @thread.alive?
    end

    def shutdown
      @shutdown = true
    end

    def join(*args)
      @thread.join(*args) if running?
    end

    protected

    def work_loop
      loop do
        @on_waiting.call(self)
        work_item = @queue.pop
        @on_continuing.call(self)
        break if @shutdown
        @on_work.call(work_item) if work_item
      end
    ensure
      @on_shutdown.call(self)
      @thread = nil
    end

  end

end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
dat-worker-pool-0.3.0 lib/dat-worker-pool/worker.rb
dat-worker-pool-0.2.0 lib/dat-worker-pool/worker.rb