Sha256: 369128c0c846c205bc5cc208af8ac773df493a3026af75b25131f8e2c86b1eb2

Contents?: true

Size: 1.44 KB

Versions: 5

Compression:

Stored size: 1.44 KB

Contents

# frozen_string_literal: true

module Delayed
  module Master
    class Worker
      class ThreadPool
        def initialize(worker, size)
          @worker = worker
          @size = size
          @queue = SizedQueue.new(@size)
          @queue_delay = 0.5
        end

        def schedule
          @scheduler = Thread.new do
            Delayed::Worker.lifecycle.run_callbacks(:thread, @worker) do
              loop do
                while @queue.num_waiting == 0
                  sleep @queue_delay
                end

                if item = yield
                  @queue.push(item)
                  Thread.pass
                else
                  @size.times { @queue.push(:exit) }
                  break
                end
              end
            end
          end
        end

        def work
          @threads = @size.times.map do
            Thread.new do
              Delayed::Worker.lifecycle.run_callbacks(:thread, @worker) do
                loop do
                  item = @queue.pop
                  if item == :exit
                    break
                  else
                    yield item
                  end
                end
              end
            end
          end
        end

        def wait
          @scheduler.join
          @threads.each(&:join)
        end

        def shutdown
          @scheduler.kill
          @threads.each(&:kill)
          @queue.close
        end
      end
    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
delayed_job_master-3.1.2 lib/delayed/master/worker/thread_pool.rb
delayed_job_master-3.1.1 lib/delayed/master/worker/thread_pool.rb
delayed_job_master-3.1.0 lib/delayed/master/worker/thread_pool.rb
delayed_job_master-3.0.1 lib/delayed/master/worker/thread_pool.rb
delayed_job_master-3.0.0 lib/delayed/master/worker/thread_pool.rb