Sha256: 2cd19751437af8761f4200222d2bb8303b2d9ed9ae5b830a1edfba8527301606

Contents?: true

Size: 1.92 KB

Versions: 8

Compression:

Stored size: 1.92 KB

Contents

module Kuroko2
  module Command
    module Executor
      DEFAULT_NUM_WORKERS = 4
      NUM_SYSTEM_WORKERS = 2   # master and monitor

      def self.num_workers
        @num_workers ||= (ENV['NUM_WORKERS'] || DEFAULT_NUM_WORKERS).to_i + NUM_SYSTEM_WORKERS
      end

      def initialize
        @stop = ServerEngine::BlockingFlag.new

        @hostname = ENV['HOSTNAME'] || Socket.gethostname
        @queue    = ENV['QUEUE'] || Execution::DEFAULT_QUEUE

        @command = if worker_id == 0
                     Command::Kill.new(@hostname, worker_id)
                   elsif worker_id == (Command::Executor.num_workers - 1)
                     Command::Monitor.new(hostname: @hostname, worker_id: worker_id)
                   else
                     @worker = Worker.where(hostname: @hostname, worker_id: worker_id, queue: @queue).first_or_initialize
                     @worker.update!(suspendable: true)
                     Command::Shell.new(hostname: @hostname, worker_id: worker_id, worker: @worker, queue: @queue)
                   end
      end

      def run
        Kuroko2.logger = logger
        Kuroko2.logger.info "[#{@hostname}-#{worker_id}] Start worker"
        toggle_worker_status(true)
        $0 = "command-executor (worker_id=#{worker_id} command=#{@command.class.name})"

        sleep worker_id

        until @stop.wait(1 + rand)
          @command.execute
        end
      rescue Exception => e
        Kuroko2.logger.fatal("[#{@hostname}-#{worker_id}] #{e.class}: #{e.message}\n" +
          e.backtrace.map { |trace| "    #{trace}" }.join("\n"))

        raise e
      end

      def stop
        Kuroko2.logger.info "[#{@hostname}-#{worker_id}] Stop worker"
        toggle_worker_status(false)

        @stop.set!
      end

      private

      def toggle_worker_status(status)
        return false unless @command.kind_of?(Command::Shell)

        @worker.working = status
        @worker.save
      end

    end
  end
end

Version data entries

8 entries across 8 versions & 1 rubygems

Version Path
kuroko2-0.8.0 lib/autoload/kuroko2/command/executor.rb
kuroko2-0.7.0 lib/autoload/kuroko2/command/executor.rb
kuroko2-0.6.0 lib/autoload/kuroko2/command/executor.rb
kuroko2-0.5.2 lib/autoload/kuroko2/command/executor.rb
kuroko2-0.5.1 lib/autoload/kuroko2/command/executor.rb
kuroko2-0.5.0 lib/autoload/kuroko2/command/executor.rb
kuroko2-0.4.6 lib/autoload/kuroko2/command/executor.rb
kuroko2-0.4.5 lib/autoload/kuroko2/command/executor.rb