require 'thread' require 'delegate' class InThreads < Delegator attr_reader :enumerable, :thread_count def initialize(enumerable, thread_count = 10, &block) super(enumerable) @enumerable, @thread_count = enumerable, thread_count.to_i unless enumerable.is_a?(Enumerable) raise ArgumentError.new('`enumerable` should include Enumerable.') end if thread_count < 2 raise ArgumentError.new('`thread_count` can\'t be less than 2.') end each(&block) if block end # Creates new instance using underlying enumerable and new thread_count def in_threads(thread_count = 10, &block) self.class.new(enumerable, thread_count, &block) end class << self # Specify runner to use # # use :run_in_threads_consecutive, :for => %w[all? any? none? one?] # # :for is required # :ignore_undefined ignores methods which are not present in Enumerable.instance_methods def use(runner, options) methods = Array(options[:for]) raise 'no methods provided using :for option' if methods.empty? ignore_undefined = options[:ignore_undefined] enumerable_methods = Enumerable.instance_methods.map(&:to_s) methods.each do |method| unless ignore_undefined && !enumerable_methods.include?(method) class_eval <<-RUBY def #{method}(*args, &block) #{runner}(enumerable, :#{method}, *args, &block) end RUBY end end end end use :run_in_threads_return_original_enum, :for => %w[each] use :run_in_threads_return_original_enum, :for => %w[ reverse_each each_with_index enum_with_index each_cons each_slice enum_cons enum_slice zip cycle each_entry ], :ignore_undefined => true use :run_in_threads_consecutive, :for => %w[ all? any? none? one? detect find find_index drop_while take_while partition find_all select reject count collect map group_by max_by min_by minmax_by sort_by flat_map collect_concat ], :ignore_undefined => true use :run_without_threads, :for => %w[ inject reduce max min minmax sort entries to_a to_set drop take first include? member? each_with_object chunk slice_before ], :ignore_undefined => true # Special case method, works by applying run_in_threads_consecutive with map on enumerable returned by blockless run def grep(*args, &block) if block run_in_threads_consecutive(enumerable.grep(*args), :map, &block) else enumerable.grep(*args) end end # befriend with progress gem def with_progress(title = nil, length = nil, &block) ::Progress::WithProgress.new(self, title, length, &block) end protected def __getobj__ @enumerable end def __setobj__(obj) @enumerable = obj end autoload :ThreadLimiter, 'in_threads/thread_limiter' autoload :Filler, 'in_threads/filler' # Use for methods which don't use block result def run_in_threads_return_original_enum(enumerable, method, *args, &block) if block ThreadLimiter.limit(thread_count) do |limiter| enumerable.send(method, *args) do |*block_args| limiter << Thread.new(*block_args, &block) end end else enumerable.send(method, *args) end end # Use for methods which do use block result and fire objects in same way as each def run_in_threads_consecutive(enumerable, method, *args, &block) if block begin enum_a, enum_b = Filler.new(enumerable, 2).extractors results = Queue.new runner = Thread.new do Thread.current.priority = -1 ThreadLimiter.limit(thread_count) do |limiter| enum_a.each do |object| break if Thread.current[:stop] thread = Thread.new(object, &block) results << thread limiter << thread end end end enum_b.send(method, *args) do |object| results.pop.value end ensure runner[:stop] = true runner.join end else enumerable.send(method, *args) end end # Use for methods which don't use blocks or can not use threads def run_without_threads(enumerable, method, *args, &block) enumerable.send(method, *args, &block) end end require 'in_threads/enumerable'