Sha256: ab8ffecf803003f223fa9a220dfff86caf70ac9aecdaa6698bdc8099e3187451

Contents?: true

Size: 1.09 KB

Versions: 5

Compression:

Stored size: 1.09 KB

Contents

require_relative 'batch_execution'

module Upperkut
  class Processor
    def initialize(manager)
      @manager  = manager
      @worker   = @manager.worker
      @logger   = @manager.logger
      @strategy = @worker.strategy

      @sleeping_time = 0
    end

    def run
      @thread ||= Thread.new do
        begin
          process
        rescue Exception => e
          @logger.debug(
            action: :processor_killed,
            reason: e,
            stacktrace: e.backtrace
          )

          @manager.notify_killed_processor(self)
        end
      end
    end

    def kill
      return unless @thread

      @thread.raise Upperkut::Shutdown
      @thread.value # wait
    end

    private

    def process
      loop do
        next if @manager.stopped

        if @strategy.process?
          @sleeping_time = 0
          process_batch
          next
        end

        @sleeping_time += sleep(@worker.setup.polling_interval)
        @logger.debug(sleeping_time: @sleeping_time)
      end
    end

    def process_batch
      BatchExecution.new(@worker, @logger).execute
    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
upperkut-0.8.1 lib/upperkut/processor.rb
upperkut-0.8.0 lib/upperkut/processor.rb
upperkut-0.7.5 lib/upperkut/processor.rb
upperkut-0.7.4 lib/upperkut/processor.rb
upperkut-0.7.2 lib/upperkut/processor.rb