Sha256: 00a4136060565afbf9923d5f16115196ac8628acec1e199ca34e1ce622463889

Contents?: true

Size: 656 Bytes

Versions: 6

Compression:

Stored size: 656 Bytes

Contents

require 'thread'
require 'thread/pool'

Thread.abort_on_exception = true

module Doggy
  class Worker
    # Spawn 10 threads for HTTP requests.
    CONCURRENT_STREAMS = 10

    def initialize(options = {}, &runner)
      @runner = runner
      @threads = options.fetch(:threads)
    end

    def call(jobs)
      results = []
      pool = Thread::Pool.new(@threads)
      tasks = jobs.map { |job|
        pool.process {
          results << [ job, @runner.call(job) ]
        }
      }
      pool.shutdown
      if task_with_errors = tasks.detect { |task| task.exception }
        raise task_with_errors.exception
      end
      results
    end
  end
end

Version data entries

6 entries across 6 versions & 1 rubygems

Version Path
doggy-0.2.2 lib/doggy/worker.rb
doggy-0.2.0 lib/doggy/worker.rb
doggy-0.1.3 lib/doggy/worker.rb
doggy-0.1.2 lib/doggy/worker.rb
doggy-0.1.1 lib/doggy/worker.rb
doggy-0.1.0 lib/doggy/worker.rb