Sha256: 4d04b52cb81346f5c1149b24907296d7fff5bba5cd8ca4abf26f2a758ead0031

Contents?: true

Size: 1.57 KB

Versions: 1

Compression:

Stored size: 1.57 KB

Contents

module Pallets
  class Manager
    attr_reader :workers, :scheduler

    def initialize(concurrency: Pallets.configuration.concurrency)
      @backend = Pallets.backend
      @workers = concurrency.times.map { Worker.new(self, @backend) }
      @scheduler = Scheduler.new(self, @backend)
      @lock = Mutex.new
      @needs_to_stop = false
    end

    def start
      @workers.each(&:start)
      @scheduler.start
    end

    # Attempt to gracefully shutdown every worker. If any is still busy after
    # the given timeout, hard shutdown it. We don't need to worry about lost
    # jobs caused by the hard shutdown; there is a reliability list that
    # contains all active jobs, which will be automatically requeued upon next
    # start
    def shutdown
      @needs_to_stop = true

      @workers.reverse_each(&:graceful_shutdown)
      @scheduler.shutdown

      Pallets.logger.info 'Waiting for workers to finish their jobs...'
      # Wait for 10 seconds at most
      10.times do
        return if @workers.empty?
        sleep 1
      end

      @workers.reverse_each(&:hard_shutdown)
      # Ensure Pallets::Shutdown got propagated and workers finished; if not,
      # their threads will be killed anyway when the manager quits
      sleep 0.5
    end

    def remove_worker(worker)
      @lock.synchronize { @workers.delete(worker) }
    end

    def replace_worker(worker)
      @lock.synchronize do
        @workers.delete(worker)

        return if @needs_to_stop

        worker = Worker.new(self, @backend)
        @workers << worker
        worker.start
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
pallets-0.11.0 lib/pallets/manager.rb