Sha256: 0b7239a3aa83d95d0a93fd9db6665a3a92b59d7fca26fddb1a9a309098e73f69

Contents?: true

Size: 1.88 KB

Versions: 2

Compression:

Stored size: 1.88 KB

Contents

module CukeForker
  class WorkerQueue
    include Observable

    def initialize(max)
      @max = max

      @pending = []
      @running = []
      @finished = []
    end

    def backed_up?
      @pending.any?
    end

    def add(worker)
      @pending << worker
    end

    def process(poll_interval = nil)
      @start_time = Time.now

      while backed_up?
        fill
        eta
        poll poll_interval while full?
      end

      # yay, no more pending workers
    end

    def wait_until_finished(poll_interval = nil)
      until empty?
        poll poll_interval
        eta
      end
    end

    def fill
      while backed_up? and not full?
        worker = @pending.shift
        start worker
      end
    end

    def poll(seconds = nil)
      finished = @running.select { |w| w.finished? }

      if finished.empty?
        sleep seconds if seconds
      else
        finished.each { |w| finish w }
      end
    end

    def size
      @running.size
    end

    def full?
      size == @max
    end

    def empty?
      @running.empty?
    end

    def has_failures?
      @finished.any? { |w| w.failed? }
    end

    private

    def start(worker)
      fire :on_worker_starting, worker

      worker.start
      @running << worker
    end

    def finish(worker)
      @running.delete worker
      @finished << worker

      fire :on_worker_finished, worker
    end

    def eta
      return Time.now if @finished.empty?

      pending = @pending.size
      finished = @finished.size

      seconds_per_child = (Time.now - start_time) / finished
      eta = Time.now + (seconds_per_child * pending)

      fire :on_eta, eta, pending + size, finished
    end

    def fire(*args)
      changed
      notify_observers(*args)
    end

    def start_time
      @start_time or raise NotStartedError
    end

    class NotStartedError < StandardError; end

  end # WorkerQueue
end # CukeForker

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
cukeforker-0.0.2 lib/cukeforker/worker_queue.rb
cukeforker-0.0.1 lib/cukeforker/worker_queue.rb