Sha256: 796394c4cc3491e7918f2b1e08d2e46d9f22d3596290bc433a57408ceaa614b5

Contents?: true

Size: 1.87 KB

Versions: 1

Compression:

Stored size: 1.87 KB

Contents

# frozen_string_literal: true

require 'logger'

class ConcurrentExecutor
  MAX_NUMBER_OF_THREADS = 100
  attr_accessor :threads, :queue, :number_of_threads, :executor, :logger

  def initialize(number_of_threads: 4, queue_size: 100, executor: nil)
    raise 'queue must be sized' unless queue_size
    raise 'number of threads must be > 0' unless (0..MAX_NUMBER_OF_THREADS).cover?(number_of_threads)

    @errors = Queue.new
    @errored = false

    self.logger = if defined?(Rails)
                    Rails.logger
                  elsif defined?(App) && App.respond_to?(:logger)
                    App.logger
                  else
                    Logger.new(STDERR)
                  end

    self.threads = []
    self.executor = executor
    self.queue = SizedQueue.new(queue_size)
    self.number_of_threads = number_of_threads

    start_threads
  end

  def consume_enumerable(enum)
    enum.each(&queue.method(:push))
  rescue ClosedQueueError
    self.class.logger.warn 'Queue closed during iteration'
  end

  class << self
    def consume_enumerable(enum, **args, &blk)
      executor = new(args.merge(executor: blk))
      executor.consume_enumerable(enum)
    rescue StandardError => e
      puts e
      raise e
    ensure
      executor&.graceful_shutdown
    end
  end

  def graceful_shutdown
    queue.close
    threads.map(&:join)
    raise @errors.pop(false) unless @errors.empty?
  end

  private

  def start_threads
    number_of_threads.times do
      threads << if Thread.respond_to?(:new_traced)
                   Thread.new_traced(&method(:work_loop))
                 else
                   Thread.new(&method(:work_loop))
                 end
    end
  end

  def work_loop
    while (work_item = queue.pop)
      begin
        executor.call(work_item)
      rescue StandardError => e
        @errors << e
        queue.clear
        queue.close
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
concurrent_executor-1.0.0 lib/concurrent_executor.rb