Sha256: c6848e2ac102dc25859ccdcf69b1931355b00a7285b394906688288e9f4b17fc

Contents?: true

Size: 1.8 KB

Versions: 15

Compression:

Stored size: 1.8 KB

Contents

require "thread"

module Myreplicator
  class Parallelizer

    attr_accessor :queue

    def initialize *args
      options = args.extract_options!
      @queue = Queue.new
      @threads = []
      @max_threads = options[:max_threads].nil? ? 10 : options[:max_threads]     
    end

    ##
    # Runs while there are jobs in the queue
    # Waits for a second and checks for available threads
    # Exits when all jobs are allocated in threads
    ##
    def run
      @done = false
      @manager_running = false

      while @queue.size > 0
        if @threads.size <= @max_threads
          @threads << Thread.new(@queue.pop) do |proc|
            Thread.current[:status] = 'running' # Manually Set Thread state for Checks
            Transporter.new.instance_exec(proc[:params], &proc[:block])
            Thread.current[:status] = 'done' 
          end
        else
          unless @manager_running
            manage_threads 
            @manager_running = true
          end
          sleep 1
        end
      end   

      manage_threads unless @manager_running

      # Waits until all threads are completed
      # Before exiting
      while !@done
        sleep 1
      end

    end
    
    ##
    # Clears dead threads, 
    # frees thread pool for more jobs
    # Exits when no more threads are left
    ##
    def manage_threads
      Thread.new do 
        while(@threads.size > 0)
          done = []
          @threads.each do |t|
            done << t if t[:status] == "done"
          end

          done.each{|d| @threads.delete(d)} # Clear dead threads
          
          # If no more jobs are left, mark done

          if @queue.size == 0 && @threads.size == 0
            @done = true
          else
            sleep 2 # Wait for more threads to spawn
          end

        end
      end
    end

  end
end

Version data entries

15 entries across 15 versions & 1 rubygems

Version Path
myreplicator-0.0.16 lib/transporter/parallelizer.rb
myreplicator-0.0.15 lib/transporter/parallelizer.rb
myreplicator-0.0.14 lib/transporter/parallelizer.rb
myreplicator-0.0.13 lib/transporter/parallelizer.rb
myreplicator-0.0.12 lib/transporter/parallelizer.rb
myreplicator-0.0.11 lib/transporter/parallelizer.rb
myreplicator-0.0.10 lib/transporter/parallelizer.rb
myreplicator-0.0.9 lib/transporter/parallelizer.rb
myreplicator-0.0.7 lib/transporter/parallelizer.rb
myreplicator-0.0.6 lib/transporter/parallelizer.rb
myreplicator-0.0.5 lib/transporter/parallelizer.rb
myreplicator-0.0.4 lib/transporter/parallelizer.rb
myreplicator-0.0.3 lib/transporter/parallelizer.rb
myreplicator-0.0.2 lib/transporter/parallelizer.rb
myreplicator-0.0.1 lib/transporter/parallelizer.rb