Sha256: e76b0508a9045fb78d5a358248882331d127f0840a9ccb1f0657f3e96eab92c7

Contents?: true

Size: 1.06 KB

Versions: 1

Compression:

Stored size: 1.06 KB

Contents

module Upperkut
  class Processor

    def initialize(manager)
      @manager = manager
      @worker  = @manager.worker
      @sleeping_time = 0
    end

    def run
      @thread ||= Thread.new do
        process
      end
    end
    def kill
      return if !@thread
      @thread.raise Upperkut::Shutdown
    end

    private

    def process
      loop do
        if should_process?
          @sleeping_time = 0
          process_batch
          next
        end

        @sleeping_time += sleep(@worker.setup.polling_interval)
      end
    end

    def should_process?
      buffer_size = @worker.size

      return false if @manager.stopped
      return false if buffer_size == 0

      # TODO: rename #setup by config
      buffer_size >= @worker.setup.batch_size ||
        @sleeping_time >= @worker.setup.max_wait
    end

    def process_batch
      begin
        @sleeping_time = 0
        @worker.new.process
      rescue Exception => ex
        # Add to retry_queue
        # if retry_limit is reached
        # send to dead
        raise ex
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
upperkut-0.1.2 lib/upperkut/processor.rb