Sha256: e76b0508a9045fb78d5a358248882331d127f0840a9ccb1f0657f3e96eab92c7
Contents?: true
Size: 1.06 KB
Versions: 1
Compression:
Stored size: 1.06 KB
Contents
module Upperkut class Processor def initialize(manager) @manager = manager @worker = @manager.worker @sleeping_time = 0 end def run @thread ||= Thread.new do process end end def kill return if !@thread @thread.raise Upperkut::Shutdown end private def process loop do if should_process? @sleeping_time = 0 process_batch next end @sleeping_time += sleep(@worker.setup.polling_interval) end end def should_process? buffer_size = @worker.size return false if @manager.stopped return false if buffer_size == 0 # TODO: rename #setup by config buffer_size >= @worker.setup.batch_size || @sleeping_time >= @worker.setup.max_wait end def process_batch begin @sleeping_time = 0 @worker.new.process rescue Exception => ex # Add to retry_queue # if retry_limit is reached # send to dead raise ex end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
upperkut-0.1.2 | lib/upperkut/processor.rb |