Sha256: 571bb024f2e6d87644b4cc5d92a15e63c8fd40e22ff7b951c75beac761b4669d

Contents?: true

Size: 1.26 KB

Versions: 12

Compression:

Stored size: 1.26 KB

Contents

require_relative 'batch_execution'

module Upperkut
  class Processor
    def initialize(manager)
      @manager = manager
      @worker  = @manager.worker
      @logger =  @manager.logger

      @sleeping_time = 0
    end

    def run
      @thread ||= Thread.new do
        begin
          process
        rescue Exception => e
          @logger.debug(
            action: :processor_killed,
            reason: e
          )

          @manager.notify_killed_processor(self)
        end
      end
    end

    def kill
      return unless @thread
      @thread.raise Upperkut::Shutdown
      @thread.value # wait
    end

    private

    def process
      loop do
        if should_process?
          @sleeping_time = 0
          process_batch
          next
        end

        @sleeping_time += sleep(@worker.setup.polling_interval)
        @logger.debug(sleeping_time: @sleeping_time)
      end
    end

    def should_process?
      buffer_size = @worker.size

      return false if @manager.stopped
      return false if buffer_size.zero?

      # TODO: rename #setup by config
      buffer_size >= @worker.setup.batch_size ||
        @sleeping_time >= @worker.setup.max_wait
    end

    def process_batch
      BatchExecution.new(@worker, @logger).execute
    end
  end
end

Version data entries

12 entries across 12 versions & 1 rubygems

Version Path
upperkut-0.6.0 lib/upperkut/processor.rb
upperkut-0.5.2 lib/upperkut/processor.rb
upperkut-0.5.1 lib/upperkut/processor.rb
upperkut-0.5.0 lib/upperkut/processor.rb
upperkut-0.4.6 lib/upperkut/processor.rb
upperkut-0.4.5 lib/upperkut/processor.rb
upperkut-0.4.4 lib/upperkut/processor.rb
upperkut-0.4.3 lib/upperkut/processor.rb
upperkut-0.4.2 lib/upperkut/processor.rb
upperkut-0.4.1 lib/upperkut/processor.rb
upperkut-0.4.0 lib/upperkut/processor.rb
upperkut-0.3.0 lib/upperkut/processor.rb