Sha256: a7b4b5e22d406bfbafcd3e66ce9b2c9622de053d0b9844f1d6126d8a144d7bb3

Contents?: true

Size: 1012 Bytes

Versions: 1

Compression:

Stored size: 1012 Bytes

Contents

module JCukeForker
  class TaskManager < AbstractListener

    def initialize(features, opts={})
      @features = features
      @opts = opts
      @worker_sockets = {}
      @failures = false
      @mutex = Mutex.new
    end

    def on_worker_register(worker_path)
      @worker_sockets[worker_path] = UNIXSocket.open worker_path
      pop_task worker_path
    end

    def on_task_finished(worker_path, feature, status)
      @failures = @failures || !status
      pop_task worker_path
    end

    def on_worker_dead(worker_path)
     socket = @worker_sockets.delete worker_path
     socket.close
    end

    def close
      @worker_sockets.each {|k, v| v.close}
    end

    def has_failures?
      @failures
    end

    private

    def pop_task(worker_path)
        task = '__KILL__'
        @mutex.synchronize do
          if feature = @features.shift
            task = @opts.merge(feature: feature).to_json
          end
        end

      @worker_sockets[worker_path].puts(task)
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
jcukeforker-0.2.10 lib/jcukeforker/task_manager.rb